From 46de49f35825825288b2165d6e102c0d51396682 Mon Sep 17 00:00:00 2001
From: "Christian W. Zuckschwerdt" <zany@triq.net>
Date: Tue, 7 Mar 2023 12:36:35 +0100
Subject: [PATCH] Change to async SDR acquire thread (#1978)

---
 include/compat_alarm.h |  30 ----
 include/rtl_433.h      |   1 +
 include/sdr.h          |   8 +-
 src/CMakeLists.txt     |   1 -
 src/compat_alarm.c     |  85 ------------
 src/rtl_433.c          | 143 ++++++++++++-------
 src/sdr.c              | 308 ++++++++++++++++++++++++++---------------
 7 files changed, 294 insertions(+), 282 deletions(-)
 delete mode 100644 include/compat_alarm.h
 delete mode 100644 src/compat_alarm.c

diff --git a/include/compat_alarm.h b/include/compat_alarm.h
deleted file mode 100644
index 3157c151..00000000
--- a/include/compat_alarm.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/** @file
-    @brief compat_alarm adds an alarm() function for Windows.
-
-    Except for MinGW-w64 when `_POSIX` and/or `__USE_MINGW_ALARM`
-    is defined
- */
-
-#ifndef INCLUDE_COMPAT_ALARM_H_
-#define INCLUDE_COMPAT_ALARM_H_
-
-#ifdef _WIN32
-#include <windows.h>
-#include <signal.h>
-#include <io.h>    /* alarm() for MinGW is possibly here */
-
-#if !defined(_POSIX) && !defined(__USE_MINGW_ALARM)
-int win_alarm(unsigned seconds);
-#define alarm(sec)  win_alarm(sec)
-#define HAVE_win_alarm
-#endif
-
-/* No SIGUSRx on Windows. Use this unless MinGW-w64
- * has support for it (untested by me).
- */
-#if !defined(__USE_MINGW_ALARM)
-#define SIGALRM  SIGBREAK
-#endif
-
-#endif  /* _WIN32 */
-#endif  /* INCLUDE_COMPAT_ALARM_H_ */
diff --git a/include/rtl_433.h b/include/rtl_433.h
index 2195678a..e8275c75 100644
--- a/include/rtl_433.h
+++ b/include/rtl_433.h
@@ -99,6 +99,7 @@ typedef struct r_cfg {
     struct dm_state *demod;
     char const *sr_filename;
     int sr_execopen;
+    int watchdog; ///< SDR acquire stall watchdog
     /* stats*/
     time_t frames_since; ///< stats start time
     unsigned frames_count; ///< stats counter for interval
diff --git a/include/sdr.h b/include/sdr.h
index 46d61539..241d2bd2 100644
--- a/include/sdr.h
+++ b/include/sdr.h
@@ -186,13 +186,14 @@ int sdr_reset(sdr_dev_t *dev, int verbose);
     Make sure none are in use anymore.
 
     @param dev the device handle
-    @param cb a callback for sdr_event_t messages
-    @param ctx a user context to be passed to @p cb
+    @param async_cb a callback for sdr_event_t messages
+    @param async_ctx a user context to be passed to @p async_cb
     @param buf_num the number of buffers to keep
     @param buf_len the size in bytes of each buffer
     @return 0 on success
 */
-int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len);
+int sdr_start(sdr_dev_t *dev, sdr_event_cb_t async_cb, void *async_ctx, uint32_t buf_num, uint32_t buf_len);
+int sdr_start_sync(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len);
 
 /** Stop the SDR data acquisition.
 
@@ -203,6 +204,7 @@ int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, ui
     @return 0 on success
 */
 int sdr_stop(sdr_dev_t *dev);
+int sdr_stop_sync(sdr_dev_t *dev);
 
 /** Redirect SoapySDR library logging.
 */
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6903405c..86b56133 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -9,7 +9,6 @@ add_library(r_433 STATIC
     am_analyze.c
     baseband.c
     bitbuffer.c
-    compat_alarm.c
     compat_paths.c
     compat_time.c
     confparse.c
diff --git a/src/compat_alarm.c b/src/compat_alarm.c
deleted file mode 100644
index f8a47176..00000000
--- a/src/compat_alarm.c
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Emulation of the Posix function `alarm()`
- * using `CreateTimerQueueTimer()`.
- *
- * \ref https://docs.microsoft.com/en-us/windows/win32/api/threadpoollegacyapiset/nf-threadpoollegacyapiset-createtimerqueuetimer
- */
-#include <stdio.h>
-#include "compat_alarm.h"
-
-#ifdef HAVE_win_alarm /* rest of file */
-
-static HANDLE alarm_hnd = INVALID_HANDLE_VALUE;
-static int    alarm_countdown;
-
-/**
- * The timer-callback that performs the countdown.
- */
-void CALLBACK alarm_handler(PVOID param, BOOLEAN timer_fired)
-{
-    if (alarm_countdown > 0)  {
-       alarm_countdown--;
-       if (alarm_countdown == 0)
-          raise(SIGALRM);
-    }
-    (void) timer_fired;
-    (void) param;
-}
-
-/**
- * Destroy the timer.<br>
- * Called as an `atexit()` function.
- */
-static void alarm_delete(void)
-{
-    if (!alarm_hnd || alarm_hnd == INVALID_HANDLE_VALUE)
-       return;
-    signal(SIGALRM, SIG_IGN);
-    DeleteTimerQueueTimer(NULL, alarm_hnd, NULL);
-    alarm_hnd = INVALID_HANDLE_VALUE;
-}
-
-/**
- * Create a kernel32 timer once.
- */
-static void alarm_create(void)
-{
-    if (alarm_hnd && alarm_hnd != INVALID_HANDLE_VALUE)
-       return;
-
-    if (!CreateTimerQueueTimer(&alarm_hnd, NULL, alarm_handler,
-                               NULL,
-                               1000,  /* call alarm_handler() after 1 sec */
-                               1000,  /* an do it periodically every seconds */
-                               WT_EXECUTEDEFAULT | WT_EXECUTEINTIMERTHREAD)) {
-        fprintf(stderr, "CreateTimerQueueTimer() failed %lu\n", GetLastError());
-        alarm_hnd = NULL;
-    }
-    else
-        atexit(alarm_delete);
-}
-
-/**
- * Emulate an `alarm(sec)` function.
- *
- *  @param[in] seconds  the number of seconds to countdown before a `raise(SIGALRM)` is done.<br>
- *                      if `seconds == 0` the `alarm_handler()` will do nothing.
- */
-int win_alarm(unsigned seconds)
-{
-  alarm_countdown = seconds;
-  alarm_create();
-  return (0);
-}
-#else
-
-/*
- * Just so this compilation unit isn't empty.
- */
-int win_alarm(unsigned seconds);
-int win_alarm(unsigned seconds)
-{
-   (void) seconds;
-   return (0);
-}
-#endif /* HAVE_win_alarm */
diff --git a/src/rtl_433.c b/src/rtl_433.c
index d423d398..a28535a9 100644
--- a/src/rtl_433.c
+++ b/src/rtl_433.c
@@ -49,7 +49,6 @@
 #include "am_analyze.h"
 #include "confparse.h"
 #include "term_ctl.h"
-#include "compat_alarm.h"
 #include "compat_paths.h"
 #include "logger.h"
 #include "fatal.h"
@@ -361,6 +360,7 @@ static void help_write(void)
 
 static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
 {
+    //fprintf(stderr, "sdr_callback... %u\n", len);
     r_cfg_t *cfg = ctx;
     struct dm_state *demod = cfg->demod;
     char time_str[LOCAL_TIME_BUFLEN];
@@ -396,7 +396,7 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
     if (demod->frame_end_ago)
         demod->frame_end_ago += n_samples;
 
-    alarm(3); // require callback to run every 3 second, abort otherwise
+    cfg->watchdog++; // reset the frame acquire watchdog
 
     if (demod->samp_grab) {
         samp_grab_push(demod->samp_grab, iq_buf, len);
@@ -692,7 +692,6 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
         cfg->bytes_to_read -= len;
 
     if (cfg->after_successful_events_flag && (d_events > 0)) {
-        alarm(0); // cancel the watchdog timer
         if (cfg->after_successful_events_flag == 1) {
             cfg->exit_async = 1;
         }
@@ -707,11 +706,9 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
     int hop_index = cfg->hop_times > cfg->frequency_index ? cfg->frequency_index : cfg->hop_times - 1;
     if (cfg->hop_times > 0 && cfg->frequencies > 1
             && difftime(rawtime, cfg->hop_start_time) >= cfg->hop_time[hop_index]) {
-        alarm(0); // cancel the watchdog timer
         cfg->hop_now = 1;
     }
     if (cfg->duration > 0 && rawtime >= cfg->stop_time) {
-        alarm(0); // cancel the watchdog timer
         cfg->exit_async = 1;
         print_log(LOG_CRITICAL, __func__, "Time expired, exiting!");
     }
@@ -1275,7 +1272,6 @@ console_handler(int signum)
     if (CTRL_C_EVENT == signum) {
         write_err("Signal caught, exiting!\n");
         g_cfg.exit_async = 1;
-        sdr_stop(g_cfg.dev);
         // Uninstall handler, next Ctrl-C is a hard abort
         SetConsoleCtrlHandler((PHANDLER_ROUTINE)console_handler, FALSE);
         return TRUE;
@@ -1285,23 +1281,9 @@ console_handler(int signum)
         g_cfg.hop_now = 1;
         return TRUE;
     }
-    else if (signum == SIGALRM) {
-        write_err("Async read stalled, exiting!\n");
-        g_cfg.exit_code = 3;
-        g_cfg.exit_async = 1;
-        sdr_stop(g_cfg.dev);
-        return TRUE;
-    }
     return FALSE;
 }
 
-/* Only called for SIGALRM
- */
-static void sighandler(int signum)
-{
-  console_handler(signum);
-}
-
 #else
 static void sighandler(int signum)
 {
@@ -1316,15 +1298,10 @@ static void sighandler(int signum)
         g_cfg.hop_now = 1;
         return;
     }
-    else if (signum == SIGALRM) {
-        write_err("Async read stalled, exiting!\n");
-        g_cfg.exit_code = 3;
-    }
     else {
         write_err("Signal caught, exiting!\n");
     }
     g_cfg.exit_async = 1;
-    sdr_stop(g_cfg.dev);
 
     // Uninstall handler, next Ctrl-C is a hard abort
     struct sigaction sigact;
@@ -1338,25 +1315,31 @@ static void sighandler(int signum)
 }
 #endif
 
-static void sdr_handler(sdr_event_t *ev, void *ctx)
+static void sdr_handler(struct mg_connection *nc, int ev_type, void *ev_data)
 {
-    r_cfg_t *cfg = ctx;
+    //fprintf(stderr, "%s: %d, %d, %p, %p\n", __func__, nc->sock, ev_type, nc->user_data, ev_data);
+    // only process for the dummy nc
+    if (nc->sock != INVALID_SOCKET || ev_type != MG_EV_POLL)
+        return;
+    r_cfg_t *cfg     = nc->user_data;
+    sdr_event_t *ev = ev_data;
+    //fprintf(stderr, "sdr_handler...\n");
 
     data_t *data = NULL;
     if (ev->ev & SDR_EV_RATE) {
-        cfg->samp_rate = ev->sample_rate;
+        // cfg->samp_rate = ev->sample_rate;
         data = data_append(data,
                 "sample_rate", "", DATA_INT, ev->sample_rate,
                 NULL);
     }
     if (ev->ev & SDR_EV_CORR) {
-        cfg->ppm_error = ev->freq_correction;
+        // cfg->ppm_error = ev->freq_correction;
         data = data_append(data,
                 "freq_correction", "", DATA_INT, ev->freq_correction,
                 NULL);
     }
     if (ev->ev & SDR_EV_FREQ) {
-        cfg->center_frequency = ev->center_frequency;
+        // cfg->center_frequency = ev->center_frequency;
         data = data_append(data,
                 "center_frequency", "", DATA_INT, ev->center_frequency,
                 "frequencies", "", DATA_COND, cfg->frequencies > 1, DATA_ARRAY, data_array(cfg->frequencies, DATA_INT, cfg->frequency),
@@ -1373,24 +1356,61 @@ static void sdr_handler(sdr_event_t *ev, void *ctx)
     }
 
     if (ev->ev == SDR_EV_DATA) {
-        if (cfg->mgr) {
-            int max_polls = 16;
-            while (max_polls-- && mg_mgr_poll(cfg->mgr, 0));
-        }
-
-        if (!cfg->exit_async)
-            sdr_callback((unsigned char *)ev->buf, ev->len, ctx);
+        sdr_callback((unsigned char *)ev->buf, ev->len, cfg);
     }
 
-    if (cfg->exit_async)
+    if (cfg->exit_async) {
+        if (cfg->verbosity >= 2)
+            print_log(LOG_INFO, "Input", "sdr_handler exit");
         sdr_stop(cfg->dev);
+        cfg->exit_async++;
+    }
+}
+
+// note that this function is called in a different thread
+static void acquire_callback(sdr_event_t *ev, void *ctx)
+{
+    //struct timeval now;
+    //get_time_now(&now);
+    //fprintf(stderr, "%ld.%06ld acquire_callback...\n", (long)now.tv_sec, (long)now.tv_usec);
+
+    struct mg_mgr *mgr = ctx;
+
+    // TODO: We should run the demod here to unblock the event loop
+
+    // thread-safe dispatch, ev_data is the iq buffer pointer and length
+    //fprintf(stderr, "acquire_callback bc send...\n");
+    mg_broadcast(mgr, sdr_handler, (void *)ev, sizeof(*ev));
+    //fprintf(stderr, "acquire_callback bc done...\n");
+}
+
+static void timer_handler(struct mg_connection *nc, int ev, void *ev_data)
+{
+    //fprintf(stderr, "%s: %d, %d, %p, %p\n", __func__, nc->sock, ev, nc->user_data, ev_data);
+    r_cfg_t *cfg = (r_cfg_t *)nc->user_data;
+    switch (ev) {
+    case MG_EV_TIMER: {
+        double now  = *(double *)ev_data;
+        (void) now; // unused
+        double next = mg_time() + 1.5;
+        //fprintf(stderr, "timer event, current time: %.2lf, next timer: %.2lf\n", now, next);
+        mg_set_timer(nc, next); // Send us timer event again after 1.5 seconds
+
+        if (cfg->watchdog == 0) {
+            // We expect a frame at least every 250 ms
+            write_err("Async read stalled, exiting!\n");
+            cfg->exit_code  = 3;
+            cfg->exit_async = 1;
+            sdr_stop(cfg->dev);
+        }
+        cfg->watchdog = 0;
+
+        break;
+    }
+    }
 }
 
 int main(int argc, char **argv) {
-#ifndef _WIN32
-    struct sigaction sigact;
-#endif
-    FILE *in_file;
     int r = 0;
     struct dm_state *demod;
     r_cfg_t *cfg = &g_cfg;
@@ -1652,6 +1672,7 @@ int main(int argc, char **argv) {
             cfg->samp_rate        = demod->load_info.sample_rate ? demod->load_info.sample_rate : sample_rate_0;
             cfg->center_frequency = demod->load_info.center_frequency ? demod->load_info.center_frequency : cfg->frequency[0];
 
+            FILE *in_file;
             if (strcmp(demod->load_info.path, "-") == 0) { // read samples from stdin
                 in_file = stdin;
                 cfg->in_filename = "<stdin>";
@@ -1775,7 +1796,6 @@ int main(int argc, char **argv) {
             }
             demod->sample_file_pos = ((float)n_blocks + 1) * DEFAULT_BUF_LENGTH / cfg->samp_rate / demod->sample_size;
             sdr_callback(test_mode_buf, DEFAULT_BUF_LENGTH, cfg);
-            alarm(0); // cancel the watchdog timer
 
             //Always classify a signal at the end of the file
             if (demod->am_analyze)
@@ -1810,6 +1830,7 @@ int main(int argc, char **argv) {
     //demod->sample_signed = sdr_get_sample_signed(cfg->dev);
 
 #ifndef _WIN32
+    struct sigaction sigact;
     sigact.sa_handler = sighandler;
     sigemptyset(&sigact.sa_mask);
     sigact.sa_flags = 0;
@@ -1845,6 +1866,10 @@ int main(int argc, char **argv) {
     if (cfg->verbosity >= LOG_NOTICE) {
         print_log(LOG_NOTICE, "Input", "Reading samples in async mode...");
     }
+
+    // TODO: remove this before next release
+    print_log(LOG_NOTICE, "Input", "The internals of input handling changed, read about and report problems on PR #1978");
+
     if (cfg->duration > 0) {
         time(&cfg->stop_time);
         cfg->stop_time += cfg->duration;
@@ -1852,17 +1877,31 @@ int main(int argc, char **argv) {
 
     r = sdr_set_center_freq(cfg->dev, cfg->center_frequency, 1); // always verbose
 
-        time(&cfg->hop_start_time);
-        signal(SIGALRM, sighandler);
-        alarm(3); // require callback to run every 3 second, abort otherwise
+    time(&cfg->hop_start_time);
 
-        r = sdr_start(cfg->dev, sdr_handler, (void *)cfg,
-                DEFAULT_ASYNC_BUF_NUMBER, cfg->out_block_size);
-        if (r < 0) {
-            print_logf(LOG_ERROR, "Input", "async read failed (%i).", r);
-        }
+    // add dummy socket to receive broadcasts
+    struct mg_add_sock_opts opts = {.user_data = cfg};
+    struct mg_connection *nc = mg_add_sock_opt(get_mgr(cfg), INVALID_SOCKET, timer_handler, opts);
+    // Send us MG_EV_TIMER event after 2.5 seconds
+    mg_set_timer(nc, mg_time() + 2.5);
 
-        alarm(0); // cancel the watchdog timer
+    r = sdr_start(cfg->dev, acquire_callback, (void *)get_mgr(cfg),
+            DEFAULT_ASYNC_BUF_NUMBER, cfg->out_block_size);
+    if (r < 0) {
+        print_logf(LOG_ERROR, "Input", "async start failed (%i).", r);
+    }
+
+    while (!cfg->exit_async) {
+        mg_mgr_poll(cfg->mgr, 500);
+    }
+    if (cfg->verbosity >= LOG_INFO)
+        print_log(LOG_INFO, "rtl_433", "stopping...");
+    // final polls to drain the broadcast
+    //while (cfg->exit_async < 2) {
+    //    mg_mgr_poll(cfg->mgr, 100);
+    //}
+    sdr_stop(cfg->dev);
+    //print_log(LOG_INFO, "rtl_433", "stopped.");
 
     if (cfg->report_stats > 0) {
         event_occurred_handler(cfg, create_report_data(cfg, cfg->report_stats));
diff --git a/src/sdr.c b/src/sdr.c
index b4ce7d94..fa3a4a87 100644
--- a/src/sdr.c
+++ b/src/sdr.c
@@ -16,11 +16,13 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <signal.h>
 #include "sdr.h"
 #include "r_util.h"
 #include "optparse.h"
 #include "logger.h"
 #include "fatal.h"
+#include "compat_pthread.h"
 #ifdef RTLSDR
 #include <rtl-sdr.h>
 #if defined(__linux__) && (defined(__GNUC__) || defined(__clang__))
@@ -95,7 +97,6 @@ struct sdr_dev {
     char *dev_info;
 
     int running;
-    int polling;
     uint8_t *buffer; ///< sdr data buffer current and past frames
     size_t buffer_size; ///< sdr data buffer overall size (num * len)
     size_t buffer_pos; ///< sdr data buffer next write position
@@ -103,61 +104,35 @@ struct sdr_dev {
     int sample_size;
     int sample_signed;
 
-    int apply_rate;
-    int apply_freq;
-    int apply_corr;
-    int apply_gain;
     uint32_t sample_rate;
-    int freq_correction;
     uint32_t center_frequency;
-    char *gain_str;
+
+#ifdef THREADS
+    pthread_t thread;
+    pthread_mutex_t lock; ///< lock for exit_acquire
+    int exit_acquire;
+
+    // acquire thread args
+    sdr_event_cb_t async_cb;
+    void *async_ctx;
+    uint32_t buf_num;
+    uint32_t buf_len;
+#endif
 };
 
 /* internal helpers */
 
-static int apply_changes(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx)
-{
-    int r = 0;
-    sdr_event_flags_t flags = 0;
-    if (dev->apply_rate) {
-        r = sdr_set_sample_rate(dev, dev->sample_rate, 1); // always verbose
-        dev->apply_rate = 0;
-        flags |= SDR_EV_RATE;
-    }
-
-    if (dev->apply_corr) {
-        r = sdr_set_freq_correction(dev, dev->freq_correction, 1); // always verbose
-        dev->apply_corr = 0;
-        flags |= SDR_EV_CORR;
-    }
-
-    if (dev->apply_freq) {
-        r = sdr_set_center_freq(dev, dev->center_frequency, 1); // always verbose
-        dev->apply_freq = 0;
-        flags |= SDR_EV_FREQ;
-    }
-
-    char *gain_str = dev->gain_str;
-    dev->gain_str = NULL;
-    if (dev->apply_gain) {
-        r = sdr_set_tuner_gain(dev, gain_str, 1); // always verbose
-        dev->apply_gain = 0;
-        flags |= SDR_EV_GAIN;
-    }
-    if (flags) {
+/*
+        pthread_mutex_lock(&dev->lock);
         sdr_event_t ev = {
                 .ev               = flags,
                 .sample_rate      = dev->sample_rate,
-                .freq_correction  = dev->freq_correction,
                 .center_frequency = dev->center_frequency,
-                .gain_str         = gain_str,
         };
+        pthread_mutex_unlock(&dev->lock);
         if (cb)
             cb(&ev, ctx);
-        free(gain_str);
-    }
-    return r;
-}
+*/
 
 /* rtl_tcp helpers */
 
@@ -258,6 +233,9 @@ static int rtltcp_open(sdr_dev_t **out_dev, char const *dev_query, int verbose)
         WARN_CALLOC("rtltcp_open()");
         return -1; // NOTE: returns error on alloc failure.
     }
+#ifdef THREADS
+    pthread_mutex_init(&dev->lock, NULL);
+#endif
 
     dev->rtl_tcp = sock;
     dev->sample_size = sizeof(uint8_t) * 2; // CU8
@@ -286,7 +264,7 @@ static int rtltcp_close(SOCKET sock)
 
 static int rtltcp_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
 {
-    size_t buffer_size = buf_num * buf_len;
+    size_t buffer_size = (size_t)buf_num * buf_len;
     if (dev->buffer_size != buffer_size) {
         free(dev->buffer);
         dev->buffer = malloc(buffer_size);
@@ -327,14 +305,13 @@ static int rtltcp_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
 
         sdr_event_t ev = {
                 .ev  = SDR_EV_DATA,
+//                .sample_rate = dev->sample_rate,
+//                .center_frequency = dev->center_frequency,
                 .buf = buffer,
                 .len = n_read,
         };
-        dev->polling = 1;
         if (n_read > 0) // prevent a crash in callback
             cb(&ev, ctx);
-        dev->polling = 0;
-        apply_changes(dev, cb, ctx);
 
     } while (dev->running);
 
@@ -418,6 +395,9 @@ static int sdr_open_rtl(sdr_dev_t **out_dev, char const *dev_query, int verbose)
         WARN_CALLOC("sdr_open_rtl()");
         return -1; // NOTE: returns error on alloc failure.
     }
+#ifdef THREADS
+    pthread_mutex_init(&dev->lock, NULL);
+#endif
 
     for (uint32_t i = dev_query ? dev_index : 0;
             //cast quiets -Wsign-compare; if dev_index were < 0, would have returned -1 above
@@ -501,6 +481,19 @@ static void rtlsdr_read_cb(unsigned char *iq_buf, uint32_t len, void *ctx)
 {
     sdr_dev_t *dev = ctx;
 
+    //fprintf(stderr, "rtlsdr_read_cb enter...\n");
+#ifdef THREADS
+    pthread_mutex_lock(&dev->lock);
+    int exit_acquire = dev->exit_acquire;
+    pthread_mutex_unlock(&dev->lock);
+    if (exit_acquire) {
+        // we get one more call after rtlsdr_cancel_async(),
+        // it then takes a full second until rtlsdr_read_async() ends.
+        //fprintf(stderr, "rtlsdr_read_cb stopping...\n");
+        return; // do not deliver any more events
+    }
+#endif
+
     if (dev->buffer_pos + len > dev->buffer_size)
         dev->buffer_pos = 0;
     uint8_t *buffer = &dev->buffer[dev->buffer_pos];
@@ -511,16 +504,21 @@ static void rtlsdr_read_cb(unsigned char *iq_buf, uint32_t len, void *ctx)
 
     sdr_event_t ev = {
             .ev  = SDR_EV_DATA,
+//            .sample_rate = dev->sample_rate,
+//            .center_frequency = dev->center_frequency,
             .buf = buffer,
             .len = len,
     };
+    //fprintf(stderr, "rtlsdr_read_cb cb...\n");
     if (len > 0) // prevent a crash in callback
         dev->rtlsdr_cb(&ev, dev->rtlsdr_cb_ctx);
+    //fprintf(stderr, "rtlsdr_read_cb cb done.\n");
+    // NOTE: we actually need to copy the buffer to prevent it going away on cancel_async
 }
 
 static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
 {
-    size_t buffer_size = buf_num * buf_len;
+    size_t buffer_size = (size_t)buf_num * buf_len;
     if (dev->buffer_size != buffer_size) {
         free(dev->buffer);
         dev->buffer = malloc(buffer_size);
@@ -538,8 +536,6 @@ static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
     dev->rtlsdr_cb_ctx = ctx;
 
     dev->running = 1;
-    do {
-        dev->polling = 1;
 
         r = rtlsdr_read_async(dev->rtlsdr_dev, rtlsdr_read_cb, dev, buf_num, buf_len);
         // rtlsdr_read_async() returns possible error codes from:
@@ -561,10 +557,7 @@ static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
 #endif
             dev->running = 0;
         }
-        dev->polling = 0;
-        apply_changes(dev, cb, ctx);
-
-    } while (dev->running);
+    print_log(LOG_DEBUG, __func__, "rtlsdr_read_async done");
 
     return r;
 }
@@ -867,6 +860,9 @@ static int sdr_open_soapy(sdr_dev_t **out_dev, char const *dev_query, int verbos
         WARN_CALLOC("sdr_open_soapy()");
         return -1; // NOTE: returns error on alloc failure.
     }
+#ifdef THREADS
+    pthread_mutex_init(&dev->lock, NULL);
+#endif
 
     dev->soapy_dev = SoapySDRDevice_makeStrArgs(dev_query);
     if (!dev->soapy_dev) {
@@ -950,7 +946,7 @@ static int sdr_open_soapy(sdr_dev_t **out_dev, char const *dev_query, int verbos
 
 static int soapysdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
 {
-    size_t buffer_size = buf_num * buf_len;
+    size_t buffer_size = (size_t)buf_num * buf_len;
     if (dev->buffer_size != buffer_size) {
         free(dev->buffer);
         dev->buffer = malloc(buffer_size);
@@ -1015,14 +1011,13 @@ static int soapysdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint
 
         sdr_event_t ev = {
                 .ev  = SDR_EV_DATA,
+//                .sample_rate = dev->sample_rate,
+//                .center_frequency = dev->center_frequency,
                 .buf = buffer,
                 .len = n_read * dev->sample_size,
         };
-        dev->polling = 1;
         if (n_read > 0) // prevent a crash in callback
             cb(&ev, ctx);
-        dev->polling = 0;
-        apply_changes(dev, cb, ctx);
 
     } while (dev->running);
 
@@ -1072,7 +1067,7 @@ int sdr_close(sdr_dev_t *dev)
     if (!dev)
         return -1;
 
-    int ret = -1;
+    int ret = sdr_stop(dev);
 
     if (dev->rtl_tcp)
         ret = rtltcp_close(dev->rtl_tcp);
@@ -1087,6 +1082,10 @@ int sdr_close(sdr_dev_t *dev)
         ret = rtlsdr_close(dev->rtlsdr_dev);
 #endif
 
+#ifdef THREADS
+    pthread_mutex_destroy(&dev->lock);
+#endif
+
     free(dev->dev_info);
     free(dev->buffer);
     free(dev);
@@ -1122,15 +1121,12 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
     if (!dev)
         return -1;
 
-    if (dev->polling) {
-        dev->center_frequency = freq;
-        dev->apply_freq       = 1;
-#ifdef RTLSDR
-        if (dev->rtlsdr_dev)
-            rtlsdr_cancel_async(dev->rtlsdr_dev);
-#endif
-        return 0;
+#ifdef THREADS
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
     }
+#endif
 
     int r = -1;
 
@@ -1147,8 +1143,10 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
 #endif
 
 #ifdef RTLSDR
-    if (dev->rtlsdr_dev)
+    if (dev->rtlsdr_dev) {
         r = rtlsdr_set_center_freq(dev->rtlsdr_dev, freq);
+        print_logf(LOG_DEBUG, "SDR", "rtlsdr_set_center_freq %u = %d", freq, r);
+    }
 #endif
 
     if (verbose) {
@@ -1157,6 +1155,13 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
         else
             print_logf(LOG_NOTICE, "SDR", "Tuned to %s.", nice_freq(sdr_get_center_freq(dev)));
     }
+
+#ifdef THREADS
+    pthread_mutex_lock(&dev->lock);
+    dev->center_frequency = freq;
+    pthread_mutex_unlock(&dev->lock);
+#endif
+
     return r;
 }
 
@@ -1186,15 +1191,12 @@ int sdr_set_freq_correction(sdr_dev_t *dev, int ppm, int verbose)
     if (!dev)
         return -1;
 
-    if (dev->polling) {
-        dev->freq_correction = ppm;
-        dev->apply_corr      = 1;
-#ifdef RTLSDR
-        if (dev->rtlsdr_dev)
-            rtlsdr_cancel_async(dev->rtlsdr_dev);
-#endif
-        return 0;
+#ifdef THREADS
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
     }
+#endif
 
     int r = -1;
 
@@ -1228,16 +1230,12 @@ int sdr_set_auto_gain(sdr_dev_t *dev, int verbose)
     if (!dev)
         return -1;
 
-    if (dev->polling) {
-        free(dev->gain_str);
-        dev->gain_str   = NULL; // auto gain
-        dev->apply_gain = 1;
-#ifdef RTLSDR
-        if (dev->rtlsdr_dev)
-            rtlsdr_cancel_async(dev->rtlsdr_dev);
-#endif
-        return 0;
+#ifdef THREADS
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
     }
+#endif
 
     int r = -1;
 
@@ -1268,23 +1266,12 @@ int sdr_set_tuner_gain(sdr_dev_t *dev, char const *gain_str, int verbose)
     if (!dev)
         return -1;
 
-    if (dev->polling) {
-        free(dev->gain_str);
-        if (!gain_str) {
-            dev->gain_str = NULL; // auto gain
-        }
-        else {
-            dev->gain_str = strdup(gain_str);
-            if (!dev->gain_str)
-                WARN_STRDUP("set_gain_str()");
-        }
-        dev->apply_gain = 1;
-#ifdef RTLSDR
-        if (dev->rtlsdr_dev)
-            rtlsdr_cancel_async(dev->rtlsdr_dev);
-#endif
-        return 0;
+#ifdef THREADS
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
     }
+#endif
 
     int r = -1;
 
@@ -1370,15 +1357,12 @@ int sdr_set_sample_rate(sdr_dev_t *dev, uint32_t rate, int verbose)
     if (!dev)
         return -1;
 
-    if (dev->polling) {
-        dev->sample_rate = rate;
-        dev->apply_rate  = 1;
-#ifdef RTLSDR
-        if (dev->rtlsdr_dev)
-            rtlsdr_cancel_async(dev->rtlsdr_dev);
-#endif
-        return 0;
+#ifdef THREADS
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
     }
+#endif
 
     int r = -1;
 
@@ -1403,6 +1387,13 @@ int sdr_set_sample_rate(sdr_dev_t *dev, uint32_t rate, int verbose)
         else
             print_logf(LOG_NOTICE, "SDR", "Sample rate set to %u S/s.", sdr_get_sample_rate(dev)); // Unfortunately, doesn't return real rate
     }
+
+#ifdef THREADS
+    pthread_mutex_lock(&dev->lock);
+    dev->sample_rate = rate;
+    pthread_mutex_unlock(&dev->lock);
+#endif
+
     return r;
 }
 
@@ -1594,7 +1585,7 @@ int sdr_reset(sdr_dev_t *dev, int verbose)
     return r;
 }
 
-int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
+int sdr_start_sync(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
 {
     if (!dev)
         return -1;
@@ -1620,7 +1611,7 @@ int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, ui
     return -1;
 }
 
-int sdr_stop(sdr_dev_t *dev)
+int sdr_stop_sync(sdr_dev_t *dev)
 {
     if (!dev)
         return -1;
@@ -1661,3 +1652,98 @@ void sdr_redirect_logging(void)
     SoapySDR_registerLogHandler(soapysdr_log_handler);
 #endif
 }
+
+/* threading */
+
+#ifdef THREADS
+static THREAD_RETURN THREAD_CALL acquire_thread(void *arg)
+{
+    sdr_dev_t *dev = arg;
+    print_log(LOG_DEBUG, __func__, "acquire_thread enter...");
+
+    int r = sdr_start_sync(dev, dev->async_cb, dev->async_ctx, dev->buf_num, dev->buf_len);
+    // if (cfg->verbosity > 1)
+    print_log(LOG_DEBUG, __func__, "acquire_thread async stop...");
+
+    if (r < 0) {
+        print_logf(LOG_ERROR, "SDR", "async read failed (%i).", r);
+    }
+
+//    sdr_event_t ev = {
+//            .ev  = SDR_EV_QUIT,
+//    };
+//    dev->async_cb(&ev, dev->async_ctx);
+
+    print_log(LOG_DEBUG, __func__, "acquire_thread done...");
+    return (THREAD_RETURN)(intptr_t)r;
+}
+
+int sdr_start(sdr_dev_t *dev, sdr_event_cb_t async_cb, void *async_ctx, uint32_t buf_num, uint32_t buf_len)
+{
+    if (!dev)
+        return -1;
+
+    dev->async_cb = async_cb;
+    dev->async_ctx = async_ctx;
+    dev->buf_num = buf_num;
+    dev->buf_len = buf_len;
+
+#ifndef _WIN32
+    // Block all signals from the worker thread
+    sigset_t sigset;
+    sigset_t oldset;
+    sigfillset(&sigset);
+    pthread_sigmask(SIG_SETMASK, &sigset, &oldset);
+#endif
+    int r = pthread_create(&dev->thread, NULL, acquire_thread, dev);
+#ifndef _WIN32
+    pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+    if (r) {
+        fprintf(stderr, "%s: error in pthread_create, rc: %d\n", __func__, r);
+    }
+    return r;
+}
+
+int sdr_stop(sdr_dev_t *dev)
+{
+    if (!dev)
+        return -1;
+
+    if (pthread_equal(dev->thread, pthread_self())) {
+        fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
+        return -1;
+    }
+
+    print_log(LOG_DEBUG, __func__, "EXITING...");
+    pthread_mutex_lock(&dev->lock);
+    if (dev->exit_acquire) {
+        pthread_mutex_unlock(&dev->lock);
+        print_log(LOG_DEBUG, __func__, "Already exiting.");
+        return 0;
+    }
+    dev->exit_acquire = 1; // for rtl_tcp and SoapySDR
+    sdr_stop_sync(dev); // for rtlsdr
+    pthread_mutex_unlock(&dev->lock);
+
+    print_log(LOG_DEBUG, __func__, "JOINING...");
+    int r = pthread_join(dev->thread, NULL);
+    if (r) {
+        fprintf(stderr, "%s: error in pthread_join, rc: %d\n", __func__, r);
+    }
+
+    print_log(LOG_DEBUG, __func__, "EXITED.");
+    return r;
+}
+#else
+int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
+{
+    UNUSED(dev);
+    return -1;
+}
+int sdr_stop(sdr_dev_t *dev)
+{
+    UNUSED(dev);
+    return -1;
+}
+#endif