diff --git a/src/rtl_tcp.c b/src/rtl_tcp.c index da6057b..7f34fdb 100644 --- a/src/rtl_tcp.c +++ b/src/rtl_tcp.c @@ -61,15 +61,8 @@ static pthread_t command_thread; static pthread_cond_t exit_cond; static pthread_mutex_t exit_cond_lock; -static pthread_mutex_t ll_mutex; static pthread_cond_t cond; -struct llist { - char *data; - size_t len; - struct llist *next; -}; - typedef struct { /* structure size must be multiple of 2 bytes */ char magic[4]; uint32_t tuner_type; @@ -79,9 +72,17 @@ typedef struct { /* structure size must be multiple of 2 bytes */ static rtlsdr_dev_t *dev = NULL; static int enable_biastee = 0; -static int global_numq = 0; -static struct llist *ll_buffers = 0; -static int llbuf_num = 500; + +// Ring Buffer declarations +// 8MB appears to cover several seconds at high bitrates -- about as much lag as you'd want +#define RINGBUFSZ_INIT (8*1024*1024) +static int ringbuf_sz = RINGBUFSZ_INIT; +static int ringbuf_trimsz = 512*1024; +static unsigned char *ringbuf = NULL; +static volatile unsigned int ringbuf_head = 0; +static volatile unsigned int ringbuf_tail = 0; +static unsigned int total_radio_bytes = 0; +static unsigned int max_bytes_in_flight = 0; static volatile int do_exit = 0; @@ -145,53 +146,60 @@ static void sighandler(int signum) void rtlsdr_callback(unsigned char *buf, uint32_t len, void *ctx) { + static time_t lasttime = 0; + static int lastbytes = 0; + time_t curtime; + if(!do_exit) { - struct llist *rpt = (struct llist*)malloc(sizeof(struct llist)); - rpt->data = (char*)malloc(len); - memcpy(rpt->data, buf, len); - rpt->len = len; - rpt->next = NULL; - - pthread_mutex_lock(&ll_mutex); - - if (ll_buffers == NULL) { - ll_buffers = rpt; - } else { - struct llist *cur = ll_buffers; - int num_queued = 0; - - while (cur->next != NULL) { - cur = cur->next; - num_queued++; - } + unsigned int bufferleft; - if(llbuf_num && llbuf_num == num_queued-2){ - struct llist *curelem; + if (ringbuf == NULL) + { + printf("Allocate %d bytes for ringbuf.\n", ringbuf_sz); + ringbuf = (unsigned char*)malloc(ringbuf_sz); + } - free(ll_buffers->data); - curelem = ll_buffers->next; - free(ll_buffers); - ll_buffers = curelem; + bufferleft = ringbuf_sz - ((ringbuf_head < ringbuf_tail) ? (ringbuf_head - ringbuf_tail + ringbuf_sz) : (ringbuf_head - ringbuf_tail)); + if (len < bufferleft) + { + if ((ringbuf_head+len) < (unsigned int)ringbuf_sz) + { + memcpy(((unsigned char*)(ringbuf+ringbuf_head)), buf, len); } + else + { + memcpy(((unsigned char*)ringbuf+ringbuf_head), buf, ringbuf_sz-ringbuf_head); + memcpy((unsigned char*)ringbuf, buf+(ringbuf_sz-ringbuf_head), len-(ringbuf_sz-ringbuf_head)); + } + ringbuf_head = (ringbuf_head + len) % ringbuf_sz; + } + else + { + printf("overrun: head=%d tail=%d, Trimming %d bytes from tail of buffer\n", ringbuf_head, ringbuf_tail, ringbuf_trimsz); + ringbuf_tail = (ringbuf_tail + ringbuf_trimsz) % ringbuf_sz; + } - cur->next = rpt; - - if (num_queued > global_numq) - printf("ll+, now %d\n", num_queued); - else if (num_queued < global_numq) - printf("ll-, now %d\n", num_queued); - - global_numq = num_queued; + total_radio_bytes += len; + curtime = time (NULL); + if ((curtime - lasttime) > 30) + { + int nsecs = curtime - lasttime; + int nbytes = total_radio_bytes - lastbytes; + int bytes_in_flight = (ringbuf_head - ringbuf_tail); + if (bytes_in_flight < 0) + bytes_in_flight = ringbuf_sz + bytes_in_flight; + lasttime=curtime; + lastbytes=total_radio_bytes; + printf(">> [ %3.2fMB/s ] [ bytes_in_flight(cur/max) = %4dK / %4dK ]\n", + (float)nbytes/(float)nsecs/1000.0/1000.0, bytes_in_flight/1024, max_bytes_in_flight/1024); + max_bytes_in_flight=0; } - pthread_cond_signal(&cond); - pthread_mutex_unlock(&ll_mutex); } } static void *tcp_worker(void *arg) { - struct llist *curelem,*prev; - int bytesleft,bytessent, index; + int bytesleft, bytessent; struct timeval tv= {1,0}; struct timespec ts; struct timeval tp; @@ -202,47 +210,33 @@ static void *tcp_worker(void *arg) if(do_exit) pthread_exit(0); - pthread_mutex_lock(&ll_mutex); - gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec+5; - ts.tv_nsec = tp.tv_usec * 1000; - r = pthread_cond_timedwait(&cond, &ll_mutex, &ts); - if(r == ETIMEDOUT) { - pthread_mutex_unlock(&ll_mutex); - printf("worker cond timeout\n"); - sighandler(0); - pthread_exit(NULL); - } - - curelem = ll_buffers; - ll_buffers = 0; - pthread_mutex_unlock(&ll_mutex); - - while(curelem != 0) { - bytesleft = curelem->len; - index = 0; - bytessent = 0; - while(bytesleft > 0) { - FD_ZERO(&writefds); - FD_SET(s, &writefds); - tv.tv_sec = 1; - tv.tv_usec = 0; - r = select(s+1, NULL, &writefds, NULL, &tv); - if(r) { - bytessent = send(s, &curelem->data[index], bytesleft, 0); - bytesleft -= bytessent; - index += bytessent; - } - if(bytessent == SOCKET_ERROR || do_exit) { - printf("worker socket bye\n"); - sighandler(0); - pthread_exit(NULL); - } - } - prev = curelem; - curelem = curelem->next; - free(prev->data); - free(prev); + bytesleft = (ringbuf_head < ringbuf_tail) ? + (ringbuf_head - ringbuf_tail + ringbuf_sz) : + (ringbuf_head - ringbuf_tail); + while (bytesleft > 0) + { + FD_ZERO(&writefds); + FD_SET(s, &writefds); + tv.tv_sec = 1; + tv.tv_usec = 0; + r = select(s+1, NULL, &writefds, NULL, &tv); + if(r) { + unsigned int sendchunk; + if (ringbuf_tail < ringbuf_head) + sendchunk = ringbuf_head - ringbuf_tail; + else + sendchunk = ringbuf_sz - ringbuf_tail; + if (sendchunk > max_bytes_in_flight) + max_bytes_in_flight = sendchunk; + bytessent = send(s, (unsigned char*)(ringbuf+ringbuf_tail), sendchunk, 0); + bytesleft -= bytessent; + ringbuf_tail = (ringbuf_tail + bytessent) % ringbuf_sz; + } + if(bytessent == SOCKET_ERROR || do_exit) { + printf("worker socket bye\n"); + sighandler(0); + pthread_exit(NULL); + } } } } @@ -397,7 +391,7 @@ int main(int argc, char **argv) struct sigaction sigact, sigign; #endif - while ((opt = getopt(argc, argv, "a:p:f:g:s:b:n:d:P:T")) != -1) { + while ((opt = getopt(argc, argv, "a:p:f:g:s:b:d:P:T")) != -1) { switch (opt) { case 'd': dev_index = verbose_device_search(optarg); @@ -421,9 +415,6 @@ int main(int argc, char **argv) case 'b': buf_num = atoi(optarg); break; - case 'n': - llbuf_num = atoi(optarg); - break; case 'P': ppm_error = atoi(optarg); break; @@ -444,7 +435,7 @@ int main(int argc, char **argv) } if (dev_index < 0) { - exit(1); + exit(1); } rtlsdr_open(&dev, (uint32_t)dev_index); @@ -510,7 +501,6 @@ int main(int argc, char **argv) fprintf(stderr, "WARNING: Failed to reset buffers.\n"); pthread_mutex_init(&exit_cond_lock, NULL); - pthread_mutex_init(&ll_mutex, NULL); pthread_mutex_init(&exit_cond_lock, NULL); pthread_cond_init(&cond, NULL); pthread_cond_init(&exit_cond, NULL); @@ -536,10 +526,10 @@ int main(int argc, char **argv) while(1) { printf("listening...\n"); printf("Use the device argument 'rtl_tcp=%s:%d' in OsmoSDR " - "(gr-osmosdr) source\n" - "to receive samples in GRC and control " - "rtl_tcp parameters (frequency, gain, ...).\n", - addr, port); + "(gr-osmosdr) source\n" + "to receive samples in GRC and control " + "rtl_tcp parameters (frequency, gain, ...).\n", + addr, port); listen(listensocket,1); while(1) { @@ -590,24 +580,20 @@ int main(int argc, char **argv) closesocket(s); printf("all threads dead..\n"); - curelem = ll_buffers; - ll_buffers = 0; - - while(curelem != 0) { - prev = curelem; - curelem = curelem->next; - free(prev->data); - free(prev); - } + + // Clear stale data for next client + ringbuf_head = ringbuf_tail = 0; + memset(ringbuf, 0, ringbuf_sz); do_exit = 0; - global_numq = 0; } out: rtlsdr_close(dev); closesocket(listensocket); closesocket(s); + if (ringbuf) + free(ringbuf); #ifdef _WIN32 WSACleanup(); #endif