Merge pull request #1 from blinick/wip_rtltcp_ringbuf

Wip rtltcp ringbuf
master
rtlsdrblog 5 years ago committed by GitHub
commit 775daa821f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 162
      src/rtl_tcp.c

@ -61,15 +61,8 @@ static pthread_t command_thread;
static pthread_cond_t exit_cond; static pthread_cond_t exit_cond;
static pthread_mutex_t exit_cond_lock; static pthread_mutex_t exit_cond_lock;
static pthread_mutex_t ll_mutex;
static pthread_cond_t cond; static pthread_cond_t cond;
struct llist {
char *data;
size_t len;
struct llist *next;
};
typedef struct { /* structure size must be multiple of 2 bytes */ typedef struct { /* structure size must be multiple of 2 bytes */
char magic[4]; char magic[4];
uint32_t tuner_type; uint32_t tuner_type;
@ -79,9 +72,17 @@ typedef struct { /* structure size must be multiple of 2 bytes */
static rtlsdr_dev_t *dev = NULL; static rtlsdr_dev_t *dev = NULL;
static int enable_biastee = 0; static int enable_biastee = 0;
static int global_numq = 0;
static struct llist *ll_buffers = 0; // Ring Buffer declarations
static int llbuf_num = 500; // 8MB appears to cover several seconds at high bitrates -- about as much lag as you'd want
#define RINGBUFSZ_INIT (8*1024*1024)
static int ringbuf_sz = RINGBUFSZ_INIT;
static int ringbuf_trimsz = 512*1024;
static unsigned char *ringbuf = NULL;
static volatile unsigned int ringbuf_head = 0;
static volatile unsigned int ringbuf_tail = 0;
static unsigned int total_radio_bytes = 0;
static unsigned int max_bytes_in_flight = 0;
static volatile int do_exit = 0; static volatile int do_exit = 0;
@ -145,53 +146,60 @@ static void sighandler(int signum)
void rtlsdr_callback(unsigned char *buf, uint32_t len, void *ctx) void rtlsdr_callback(unsigned char *buf, uint32_t len, void *ctx)
{ {
if(!do_exit) { static time_t lasttime = 0;
struct llist *rpt = (struct llist*)malloc(sizeof(struct llist)); static int lastbytes = 0;
rpt->data = (char*)malloc(len); time_t curtime;
memcpy(rpt->data, buf, len);
rpt->len = len;
rpt->next = NULL;
pthread_mutex_lock(&ll_mutex);
if (ll_buffers == NULL) { if(!do_exit) {
ll_buffers = rpt; unsigned int bufferleft;
} else {
struct llist *cur = ll_buffers;
int num_queued = 0;
while (cur->next != NULL) { if (ringbuf == NULL)
cur = cur->next; {
num_queued++; printf("Allocate %d bytes for ringbuf.\n", ringbuf_sz);
ringbuf = (unsigned char*)malloc(ringbuf_sz);
} }
if(llbuf_num && llbuf_num == num_queued-2){ bufferleft = ringbuf_sz - ((ringbuf_head < ringbuf_tail) ? (ringbuf_head - ringbuf_tail + ringbuf_sz) : (ringbuf_head - ringbuf_tail));
struct llist *curelem; if (len < bufferleft)
{
free(ll_buffers->data); if ((ringbuf_head+len) < (unsigned int)ringbuf_sz)
curelem = ll_buffers->next; {
free(ll_buffers); memcpy(((unsigned char*)(ringbuf+ringbuf_head)), buf, len);
ll_buffers = curelem; }
else
{
memcpy(((unsigned char*)ringbuf+ringbuf_head), buf, ringbuf_sz-ringbuf_head);
memcpy((unsigned char*)ringbuf, buf+(ringbuf_sz-ringbuf_head), len-(ringbuf_sz-ringbuf_head));
}
ringbuf_head = (ringbuf_head + len) % ringbuf_sz;
}
else
{
printf("overrun: head=%d tail=%d, Trimming %d bytes from tail of buffer\n", ringbuf_head, ringbuf_tail, ringbuf_trimsz);
ringbuf_tail = (ringbuf_tail + ringbuf_trimsz) % ringbuf_sz;
} }
cur->next = rpt; total_radio_bytes += len;
curtime = time (NULL);
if (num_queued > global_numq) if ((curtime - lasttime) > 30)
printf("ll+, now %d\n", num_queued); {
else if (num_queued < global_numq) int nsecs = curtime - lasttime;
printf("ll-, now %d\n", num_queued); int nbytes = total_radio_bytes - lastbytes;
int bytes_in_flight = (ringbuf_head - ringbuf_tail);
global_numq = num_queued; if (bytes_in_flight < 0)
bytes_in_flight = ringbuf_sz + bytes_in_flight;
lasttime=curtime;
lastbytes=total_radio_bytes;
printf(">> [ %3.2fMB/s ] [ bytes_in_flight(cur/max) = %4dK / %4dK ]\n",
(float)nbytes/(float)nsecs/1000.0/1000.0, bytes_in_flight/1024, max_bytes_in_flight/1024);
max_bytes_in_flight=0;
} }
pthread_cond_signal(&cond);
pthread_mutex_unlock(&ll_mutex);
} }
} }
static void *tcp_worker(void *arg) static void *tcp_worker(void *arg)
{ {
struct llist *curelem,*prev; int bytesleft, bytessent;
int bytesleft,bytessent, index;
struct timeval tv= {1,0}; struct timeval tv= {1,0};
struct timespec ts; struct timespec ts;
struct timeval tp; struct timeval tp;
@ -202,36 +210,27 @@ static void *tcp_worker(void *arg)
if(do_exit) if(do_exit)
pthread_exit(0); pthread_exit(0);
pthread_mutex_lock(&ll_mutex); bytesleft = (ringbuf_head < ringbuf_tail) ?
gettimeofday(&tp, NULL); (ringbuf_head - ringbuf_tail + ringbuf_sz) :
ts.tv_sec = tp.tv_sec+5; (ringbuf_head - ringbuf_tail);
ts.tv_nsec = tp.tv_usec * 1000; while (bytesleft > 0)
r = pthread_cond_timedwait(&cond, &ll_mutex, &ts); {
if(r == ETIMEDOUT) {
pthread_mutex_unlock(&ll_mutex);
printf("worker cond timeout\n");
sighandler(0);
pthread_exit(NULL);
}
curelem = ll_buffers;
ll_buffers = 0;
pthread_mutex_unlock(&ll_mutex);
while(curelem != 0) {
bytesleft = curelem->len;
index = 0;
bytessent = 0;
while(bytesleft > 0) {
FD_ZERO(&writefds); FD_ZERO(&writefds);
FD_SET(s, &writefds); FD_SET(s, &writefds);
tv.tv_sec = 1; tv.tv_sec = 1;
tv.tv_usec = 0; tv.tv_usec = 0;
r = select(s+1, NULL, &writefds, NULL, &tv); r = select(s+1, NULL, &writefds, NULL, &tv);
if(r) { if(r) {
bytessent = send(s, &curelem->data[index], bytesleft, 0); unsigned int sendchunk;
if (ringbuf_tail < ringbuf_head)
sendchunk = ringbuf_head - ringbuf_tail;
else
sendchunk = ringbuf_sz - ringbuf_tail;
if (sendchunk > max_bytes_in_flight)
max_bytes_in_flight = sendchunk;
bytessent = send(s, (unsigned char*)(ringbuf+ringbuf_tail), sendchunk, 0);
bytesleft -= bytessent; bytesleft -= bytessent;
index += bytessent; ringbuf_tail = (ringbuf_tail + bytessent) % ringbuf_sz;
} }
if(bytessent == SOCKET_ERROR || do_exit) { if(bytessent == SOCKET_ERROR || do_exit) {
printf("worker socket bye\n"); printf("worker socket bye\n");
@ -239,11 +238,6 @@ static void *tcp_worker(void *arg)
pthread_exit(NULL); pthread_exit(NULL);
} }
} }
prev = curelem;
curelem = curelem->next;
free(prev->data);
free(prev);
}
} }
} }
@ -397,7 +391,7 @@ int main(int argc, char **argv)
struct sigaction sigact, sigign; struct sigaction sigact, sigign;
#endif #endif
while ((opt = getopt(argc, argv, "a:p:f:g:s:b:n:d:P:T")) != -1) { while ((opt = getopt(argc, argv, "a:p:f:g:s:b:d:P:T")) != -1) {
switch (opt) { switch (opt) {
case 'd': case 'd':
dev_index = verbose_device_search(optarg); dev_index = verbose_device_search(optarg);
@ -421,9 +415,6 @@ int main(int argc, char **argv)
case 'b': case 'b':
buf_num = atoi(optarg); buf_num = atoi(optarg);
break; break;
case 'n':
llbuf_num = atoi(optarg);
break;
case 'P': case 'P':
ppm_error = atoi(optarg); ppm_error = atoi(optarg);
break; break;
@ -510,7 +501,6 @@ int main(int argc, char **argv)
fprintf(stderr, "WARNING: Failed to reset buffers.\n"); fprintf(stderr, "WARNING: Failed to reset buffers.\n");
pthread_mutex_init(&exit_cond_lock, NULL); pthread_mutex_init(&exit_cond_lock, NULL);
pthread_mutex_init(&ll_mutex, NULL);
pthread_mutex_init(&exit_cond_lock, NULL); pthread_mutex_init(&exit_cond_lock, NULL);
pthread_cond_init(&cond, NULL); pthread_cond_init(&cond, NULL);
pthread_cond_init(&exit_cond, NULL); pthread_cond_init(&exit_cond, NULL);
@ -590,24 +580,20 @@ int main(int argc, char **argv)
closesocket(s); closesocket(s);
printf("all threads dead..\n"); printf("all threads dead..\n");
curelem = ll_buffers;
ll_buffers = 0; // Clear stale data for next client
ringbuf_head = ringbuf_tail = 0;
while(curelem != 0) { memset(ringbuf, 0, ringbuf_sz);
prev = curelem;
curelem = curelem->next;
free(prev->data);
free(prev);
}
do_exit = 0; do_exit = 0;
global_numq = 0;
} }
out: out:
rtlsdr_close(dev); rtlsdr_close(dev);
closesocket(listensocket); closesocket(listensocket);
closesocket(s); closesocket(s);
if (ringbuf)
free(ringbuf);
#ifdef _WIN32 #ifdef _WIN32
WSACleanup(); WSACleanup();
#endif #endif

Loading…
Cancel
Save