Change to async SDR acquire thread ()

This commit is contained in:
Christian W. Zuckschwerdt 2023-03-07 12:36:35 +01:00 committed by GitHub
parent fa13eb69ce
commit 46de49f358
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 294 additions and 282 deletions

View file

@ -1,30 +0,0 @@
/** @file
@brief compat_alarm adds an alarm() function for Windows.
Except for MinGW-w64 when `_POSIX` and/or `__USE_MINGW_ALARM`
is defined
*/
#ifndef INCLUDE_COMPAT_ALARM_H_
#define INCLUDE_COMPAT_ALARM_H_
#ifdef _WIN32
#include <windows.h>
#include <signal.h>
#include <io.h> /* alarm() for MinGW is possibly here */
#if !defined(_POSIX) && !defined(__USE_MINGW_ALARM)
int win_alarm(unsigned seconds);
#define alarm(sec) win_alarm(sec)
#define HAVE_win_alarm
#endif
/* No SIGUSRx on Windows. Use this unless MinGW-w64
* has support for it (untested by me).
*/
#if !defined(__USE_MINGW_ALARM)
#define SIGALRM SIGBREAK
#endif
#endif /* _WIN32 */
#endif /* INCLUDE_COMPAT_ALARM_H_ */

View file

@ -99,6 +99,7 @@ typedef struct r_cfg {
struct dm_state *demod;
char const *sr_filename;
int sr_execopen;
int watchdog; ///< SDR acquire stall watchdog
/* stats*/
time_t frames_since; ///< stats start time
unsigned frames_count; ///< stats counter for interval

View file

@ -186,13 +186,14 @@ int sdr_reset(sdr_dev_t *dev, int verbose);
Make sure none are in use anymore.
@param dev the device handle
@param cb a callback for sdr_event_t messages
@param ctx a user context to be passed to @p cb
@param async_cb a callback for sdr_event_t messages
@param async_ctx a user context to be passed to @p async_cb
@param buf_num the number of buffers to keep
@param buf_len the size in bytes of each buffer
@return 0 on success
*/
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len);
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t async_cb, void *async_ctx, uint32_t buf_num, uint32_t buf_len);
int sdr_start_sync(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len);
/** Stop the SDR data acquisition.
@ -203,6 +204,7 @@ int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, ui
@return 0 on success
*/
int sdr_stop(sdr_dev_t *dev);
int sdr_stop_sync(sdr_dev_t *dev);
/** Redirect SoapySDR library logging.
*/

View file

@ -9,7 +9,6 @@ add_library(r_433 STATIC
am_analyze.c
baseband.c
bitbuffer.c
compat_alarm.c
compat_paths.c
compat_time.c
confparse.c

View file

@ -1,85 +0,0 @@
/**
* Emulation of the Posix function `alarm()`
* using `CreateTimerQueueTimer()`.
*
* \ref https://docs.microsoft.com/en-us/windows/win32/api/threadpoollegacyapiset/nf-threadpoollegacyapiset-createtimerqueuetimer
*/
#include <stdio.h>
#include "compat_alarm.h"
#ifdef HAVE_win_alarm /* rest of file */
static HANDLE alarm_hnd = INVALID_HANDLE_VALUE;
static int alarm_countdown;
/**
* The timer-callback that performs the countdown.
*/
void CALLBACK alarm_handler(PVOID param, BOOLEAN timer_fired)
{
if (alarm_countdown > 0) {
alarm_countdown--;
if (alarm_countdown == 0)
raise(SIGALRM);
}
(void) timer_fired;
(void) param;
}
/**
* Destroy the timer.<br>
* Called as an `atexit()` function.
*/
static void alarm_delete(void)
{
if (!alarm_hnd || alarm_hnd == INVALID_HANDLE_VALUE)
return;
signal(SIGALRM, SIG_IGN);
DeleteTimerQueueTimer(NULL, alarm_hnd, NULL);
alarm_hnd = INVALID_HANDLE_VALUE;
}
/**
* Create a kernel32 timer once.
*/
static void alarm_create(void)
{
if (alarm_hnd && alarm_hnd != INVALID_HANDLE_VALUE)
return;
if (!CreateTimerQueueTimer(&alarm_hnd, NULL, alarm_handler,
NULL,
1000, /* call alarm_handler() after 1 sec */
1000, /* an do it periodically every seconds */
WT_EXECUTEDEFAULT | WT_EXECUTEINTIMERTHREAD)) {
fprintf(stderr, "CreateTimerQueueTimer() failed %lu\n", GetLastError());
alarm_hnd = NULL;
}
else
atexit(alarm_delete);
}
/**
* Emulate an `alarm(sec)` function.
*
* @param[in] seconds the number of seconds to countdown before a `raise(SIGALRM)` is done.<br>
* if `seconds == 0` the `alarm_handler()` will do nothing.
*/
int win_alarm(unsigned seconds)
{
alarm_countdown = seconds;
alarm_create();
return (0);
}
#else
/*
* Just so this compilation unit isn't empty.
*/
int win_alarm(unsigned seconds);
int win_alarm(unsigned seconds)
{
(void) seconds;
return (0);
}
#endif /* HAVE_win_alarm */

View file

@ -49,7 +49,6 @@
#include "am_analyze.h"
#include "confparse.h"
#include "term_ctl.h"
#include "compat_alarm.h"
#include "compat_paths.h"
#include "logger.h"
#include "fatal.h"
@ -361,6 +360,7 @@ static void help_write(void)
static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
{
//fprintf(stderr, "sdr_callback... %u\n", len);
r_cfg_t *cfg = ctx;
struct dm_state *demod = cfg->demod;
char time_str[LOCAL_TIME_BUFLEN];
@ -396,7 +396,7 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
if (demod->frame_end_ago)
demod->frame_end_ago += n_samples;
alarm(3); // require callback to run every 3 second, abort otherwise
cfg->watchdog++; // reset the frame acquire watchdog
if (demod->samp_grab) {
samp_grab_push(demod->samp_grab, iq_buf, len);
@ -692,7 +692,6 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
cfg->bytes_to_read -= len;
if (cfg->after_successful_events_flag && (d_events > 0)) {
alarm(0); // cancel the watchdog timer
if (cfg->after_successful_events_flag == 1) {
cfg->exit_async = 1;
}
@ -707,11 +706,9 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
int hop_index = cfg->hop_times > cfg->frequency_index ? cfg->frequency_index : cfg->hop_times - 1;
if (cfg->hop_times > 0 && cfg->frequencies > 1
&& difftime(rawtime, cfg->hop_start_time) >= cfg->hop_time[hop_index]) {
alarm(0); // cancel the watchdog timer
cfg->hop_now = 1;
}
if (cfg->duration > 0 && rawtime >= cfg->stop_time) {
alarm(0); // cancel the watchdog timer
cfg->exit_async = 1;
print_log(LOG_CRITICAL, __func__, "Time expired, exiting!");
}
@ -1275,7 +1272,6 @@ console_handler(int signum)
if (CTRL_C_EVENT == signum) {
write_err("Signal caught, exiting!\n");
g_cfg.exit_async = 1;
sdr_stop(g_cfg.dev);
// Uninstall handler, next Ctrl-C is a hard abort
SetConsoleCtrlHandler((PHANDLER_ROUTINE)console_handler, FALSE);
return TRUE;
@ -1285,23 +1281,9 @@ console_handler(int signum)
g_cfg.hop_now = 1;
return TRUE;
}
else if (signum == SIGALRM) {
write_err("Async read stalled, exiting!\n");
g_cfg.exit_code = 3;
g_cfg.exit_async = 1;
sdr_stop(g_cfg.dev);
return TRUE;
}
return FALSE;
}
/* Only called for SIGALRM
*/
static void sighandler(int signum)
{
console_handler(signum);
}
#else
static void sighandler(int signum)
{
@ -1316,15 +1298,10 @@ static void sighandler(int signum)
g_cfg.hop_now = 1;
return;
}
else if (signum == SIGALRM) {
write_err("Async read stalled, exiting!\n");
g_cfg.exit_code = 3;
}
else {
write_err("Signal caught, exiting!\n");
}
g_cfg.exit_async = 1;
sdr_stop(g_cfg.dev);
// Uninstall handler, next Ctrl-C is a hard abort
struct sigaction sigact;
@ -1338,25 +1315,31 @@ static void sighandler(int signum)
}
#endif
static void sdr_handler(sdr_event_t *ev, void *ctx)
static void sdr_handler(struct mg_connection *nc, int ev_type, void *ev_data)
{
r_cfg_t *cfg = ctx;
//fprintf(stderr, "%s: %d, %d, %p, %p\n", __func__, nc->sock, ev_type, nc->user_data, ev_data);
// only process for the dummy nc
if (nc->sock != INVALID_SOCKET || ev_type != MG_EV_POLL)
return;
r_cfg_t *cfg = nc->user_data;
sdr_event_t *ev = ev_data;
//fprintf(stderr, "sdr_handler...\n");
data_t *data = NULL;
if (ev->ev & SDR_EV_RATE) {
cfg->samp_rate = ev->sample_rate;
// cfg->samp_rate = ev->sample_rate;
data = data_append(data,
"sample_rate", "", DATA_INT, ev->sample_rate,
NULL);
}
if (ev->ev & SDR_EV_CORR) {
cfg->ppm_error = ev->freq_correction;
// cfg->ppm_error = ev->freq_correction;
data = data_append(data,
"freq_correction", "", DATA_INT, ev->freq_correction,
NULL);
}
if (ev->ev & SDR_EV_FREQ) {
cfg->center_frequency = ev->center_frequency;
// cfg->center_frequency = ev->center_frequency;
data = data_append(data,
"center_frequency", "", DATA_INT, ev->center_frequency,
"frequencies", "", DATA_COND, cfg->frequencies > 1, DATA_ARRAY, data_array(cfg->frequencies, DATA_INT, cfg->frequency),
@ -1373,24 +1356,61 @@ static void sdr_handler(sdr_event_t *ev, void *ctx)
}
if (ev->ev == SDR_EV_DATA) {
if (cfg->mgr) {
int max_polls = 16;
while (max_polls-- && mg_mgr_poll(cfg->mgr, 0));
}
if (!cfg->exit_async)
sdr_callback((unsigned char *)ev->buf, ev->len, ctx);
sdr_callback((unsigned char *)ev->buf, ev->len, cfg);
}
if (cfg->exit_async)
if (cfg->exit_async) {
if (cfg->verbosity >= 2)
print_log(LOG_INFO, "Input", "sdr_handler exit");
sdr_stop(cfg->dev);
cfg->exit_async++;
}
}
// note that this function is called in a different thread
static void acquire_callback(sdr_event_t *ev, void *ctx)
{
//struct timeval now;
//get_time_now(&now);
//fprintf(stderr, "%ld.%06ld acquire_callback...\n", (long)now.tv_sec, (long)now.tv_usec);
struct mg_mgr *mgr = ctx;
// TODO: We should run the demod here to unblock the event loop
// thread-safe dispatch, ev_data is the iq buffer pointer and length
//fprintf(stderr, "acquire_callback bc send...\n");
mg_broadcast(mgr, sdr_handler, (void *)ev, sizeof(*ev));
//fprintf(stderr, "acquire_callback bc done...\n");
}
static void timer_handler(struct mg_connection *nc, int ev, void *ev_data)
{
//fprintf(stderr, "%s: %d, %d, %p, %p\n", __func__, nc->sock, ev, nc->user_data, ev_data);
r_cfg_t *cfg = (r_cfg_t *)nc->user_data;
switch (ev) {
case MG_EV_TIMER: {
double now = *(double *)ev_data;
(void) now; // unused
double next = mg_time() + 1.5;
//fprintf(stderr, "timer event, current time: %.2lf, next timer: %.2lf\n", now, next);
mg_set_timer(nc, next); // Send us timer event again after 1.5 seconds
if (cfg->watchdog == 0) {
// We expect a frame at least every 250 ms
write_err("Async read stalled, exiting!\n");
cfg->exit_code = 3;
cfg->exit_async = 1;
sdr_stop(cfg->dev);
}
cfg->watchdog = 0;
break;
}
}
}
int main(int argc, char **argv) {
#ifndef _WIN32
struct sigaction sigact;
#endif
FILE *in_file;
int r = 0;
struct dm_state *demod;
r_cfg_t *cfg = &g_cfg;
@ -1652,6 +1672,7 @@ int main(int argc, char **argv) {
cfg->samp_rate = demod->load_info.sample_rate ? demod->load_info.sample_rate : sample_rate_0;
cfg->center_frequency = demod->load_info.center_frequency ? demod->load_info.center_frequency : cfg->frequency[0];
FILE *in_file;
if (strcmp(demod->load_info.path, "-") == 0) { // read samples from stdin
in_file = stdin;
cfg->in_filename = "<stdin>";
@ -1775,7 +1796,6 @@ int main(int argc, char **argv) {
}
demod->sample_file_pos = ((float)n_blocks + 1) * DEFAULT_BUF_LENGTH / cfg->samp_rate / demod->sample_size;
sdr_callback(test_mode_buf, DEFAULT_BUF_LENGTH, cfg);
alarm(0); // cancel the watchdog timer
//Always classify a signal at the end of the file
if (demod->am_analyze)
@ -1810,6 +1830,7 @@ int main(int argc, char **argv) {
//demod->sample_signed = sdr_get_sample_signed(cfg->dev);
#ifndef _WIN32
struct sigaction sigact;
sigact.sa_handler = sighandler;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
@ -1845,6 +1866,10 @@ int main(int argc, char **argv) {
if (cfg->verbosity >= LOG_NOTICE) {
print_log(LOG_NOTICE, "Input", "Reading samples in async mode...");
}
// TODO: remove this before next release
print_log(LOG_NOTICE, "Input", "The internals of input handling changed, read about and report problems on PR #1978");
if (cfg->duration > 0) {
time(&cfg->stop_time);
cfg->stop_time += cfg->duration;
@ -1852,17 +1877,31 @@ int main(int argc, char **argv) {
r = sdr_set_center_freq(cfg->dev, cfg->center_frequency, 1); // always verbose
time(&cfg->hop_start_time);
signal(SIGALRM, sighandler);
alarm(3); // require callback to run every 3 second, abort otherwise
time(&cfg->hop_start_time);
r = sdr_start(cfg->dev, sdr_handler, (void *)cfg,
DEFAULT_ASYNC_BUF_NUMBER, cfg->out_block_size);
if (r < 0) {
print_logf(LOG_ERROR, "Input", "async read failed (%i).", r);
}
// add dummy socket to receive broadcasts
struct mg_add_sock_opts opts = {.user_data = cfg};
struct mg_connection *nc = mg_add_sock_opt(get_mgr(cfg), INVALID_SOCKET, timer_handler, opts);
// Send us MG_EV_TIMER event after 2.5 seconds
mg_set_timer(nc, mg_time() + 2.5);
alarm(0); // cancel the watchdog timer
r = sdr_start(cfg->dev, acquire_callback, (void *)get_mgr(cfg),
DEFAULT_ASYNC_BUF_NUMBER, cfg->out_block_size);
if (r < 0) {
print_logf(LOG_ERROR, "Input", "async start failed (%i).", r);
}
while (!cfg->exit_async) {
mg_mgr_poll(cfg->mgr, 500);
}
if (cfg->verbosity >= LOG_INFO)
print_log(LOG_INFO, "rtl_433", "stopping...");
// final polls to drain the broadcast
//while (cfg->exit_async < 2) {
// mg_mgr_poll(cfg->mgr, 100);
//}
sdr_stop(cfg->dev);
//print_log(LOG_INFO, "rtl_433", "stopped.");
if (cfg->report_stats > 0) {
event_occurred_handler(cfg, create_report_data(cfg, cfg->report_stats));

308
src/sdr.c
View file

@ -16,11 +16,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include "sdr.h"
#include "r_util.h"
#include "optparse.h"
#include "logger.h"
#include "fatal.h"
#include "compat_pthread.h"
#ifdef RTLSDR
#include <rtl-sdr.h>
#if defined(__linux__) && (defined(__GNUC__) || defined(__clang__))
@ -95,7 +97,6 @@ struct sdr_dev {
char *dev_info;
int running;
int polling;
uint8_t *buffer; ///< sdr data buffer current and past frames
size_t buffer_size; ///< sdr data buffer overall size (num * len)
size_t buffer_pos; ///< sdr data buffer next write position
@ -103,61 +104,35 @@ struct sdr_dev {
int sample_size;
int sample_signed;
int apply_rate;
int apply_freq;
int apply_corr;
int apply_gain;
uint32_t sample_rate;
int freq_correction;
uint32_t center_frequency;
char *gain_str;
#ifdef THREADS
pthread_t thread;
pthread_mutex_t lock; ///< lock for exit_acquire
int exit_acquire;
// acquire thread args
sdr_event_cb_t async_cb;
void *async_ctx;
uint32_t buf_num;
uint32_t buf_len;
#endif
};
/* internal helpers */
static int apply_changes(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx)
{
int r = 0;
sdr_event_flags_t flags = 0;
if (dev->apply_rate) {
r = sdr_set_sample_rate(dev, dev->sample_rate, 1); // always verbose
dev->apply_rate = 0;
flags |= SDR_EV_RATE;
}
if (dev->apply_corr) {
r = sdr_set_freq_correction(dev, dev->freq_correction, 1); // always verbose
dev->apply_corr = 0;
flags |= SDR_EV_CORR;
}
if (dev->apply_freq) {
r = sdr_set_center_freq(dev, dev->center_frequency, 1); // always verbose
dev->apply_freq = 0;
flags |= SDR_EV_FREQ;
}
char *gain_str = dev->gain_str;
dev->gain_str = NULL;
if (dev->apply_gain) {
r = sdr_set_tuner_gain(dev, gain_str, 1); // always verbose
dev->apply_gain = 0;
flags |= SDR_EV_GAIN;
}
if (flags) {
/*
pthread_mutex_lock(&dev->lock);
sdr_event_t ev = {
.ev = flags,
.sample_rate = dev->sample_rate,
.freq_correction = dev->freq_correction,
.center_frequency = dev->center_frequency,
.gain_str = gain_str,
};
pthread_mutex_unlock(&dev->lock);
if (cb)
cb(&ev, ctx);
free(gain_str);
}
return r;
}
*/
/* rtl_tcp helpers */
@ -258,6 +233,9 @@ static int rtltcp_open(sdr_dev_t **out_dev, char const *dev_query, int verbose)
WARN_CALLOC("rtltcp_open()");
return -1; // NOTE: returns error on alloc failure.
}
#ifdef THREADS
pthread_mutex_init(&dev->lock, NULL);
#endif
dev->rtl_tcp = sock;
dev->sample_size = sizeof(uint8_t) * 2; // CU8
@ -286,7 +264,7 @@ static int rtltcp_close(SOCKET sock)
static int rtltcp_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
size_t buffer_size = buf_num * buf_len;
size_t buffer_size = (size_t)buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buffer_size);
@ -327,14 +305,13 @@ static int rtltcp_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
sdr_event_t ev = {
.ev = SDR_EV_DATA,
// .sample_rate = dev->sample_rate,
// .center_frequency = dev->center_frequency,
.buf = buffer,
.len = n_read,
};
dev->polling = 1;
if (n_read > 0) // prevent a crash in callback
cb(&ev, ctx);
dev->polling = 0;
apply_changes(dev, cb, ctx);
} while (dev->running);
@ -418,6 +395,9 @@ static int sdr_open_rtl(sdr_dev_t **out_dev, char const *dev_query, int verbose)
WARN_CALLOC("sdr_open_rtl()");
return -1; // NOTE: returns error on alloc failure.
}
#ifdef THREADS
pthread_mutex_init(&dev->lock, NULL);
#endif
for (uint32_t i = dev_query ? dev_index : 0;
//cast quiets -Wsign-compare; if dev_index were < 0, would have returned -1 above
@ -501,6 +481,19 @@ static void rtlsdr_read_cb(unsigned char *iq_buf, uint32_t len, void *ctx)
{
sdr_dev_t *dev = ctx;
//fprintf(stderr, "rtlsdr_read_cb enter...\n");
#ifdef THREADS
pthread_mutex_lock(&dev->lock);
int exit_acquire = dev->exit_acquire;
pthread_mutex_unlock(&dev->lock);
if (exit_acquire) {
// we get one more call after rtlsdr_cancel_async(),
// it then takes a full second until rtlsdr_read_async() ends.
//fprintf(stderr, "rtlsdr_read_cb stopping...\n");
return; // do not deliver any more events
}
#endif
if (dev->buffer_pos + len > dev->buffer_size)
dev->buffer_pos = 0;
uint8_t *buffer = &dev->buffer[dev->buffer_pos];
@ -511,16 +504,21 @@ static void rtlsdr_read_cb(unsigned char *iq_buf, uint32_t len, void *ctx)
sdr_event_t ev = {
.ev = SDR_EV_DATA,
// .sample_rate = dev->sample_rate,
// .center_frequency = dev->center_frequency,
.buf = buffer,
.len = len,
};
//fprintf(stderr, "rtlsdr_read_cb cb...\n");
if (len > 0) // prevent a crash in callback
dev->rtlsdr_cb(&ev, dev->rtlsdr_cb_ctx);
//fprintf(stderr, "rtlsdr_read_cb cb done.\n");
// NOTE: we actually need to copy the buffer to prevent it going away on cancel_async
}
static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
size_t buffer_size = buf_num * buf_len;
size_t buffer_size = (size_t)buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buffer_size);
@ -538,8 +536,6 @@ static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
dev->rtlsdr_cb_ctx = ctx;
dev->running = 1;
do {
dev->polling = 1;
r = rtlsdr_read_async(dev->rtlsdr_dev, rtlsdr_read_cb, dev, buf_num, buf_len);
// rtlsdr_read_async() returns possible error codes from:
@ -561,10 +557,7 @@ static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32
#endif
dev->running = 0;
}
dev->polling = 0;
apply_changes(dev, cb, ctx);
} while (dev->running);
print_log(LOG_DEBUG, __func__, "rtlsdr_read_async done");
return r;
}
@ -867,6 +860,9 @@ static int sdr_open_soapy(sdr_dev_t **out_dev, char const *dev_query, int verbos
WARN_CALLOC("sdr_open_soapy()");
return -1; // NOTE: returns error on alloc failure.
}
#ifdef THREADS
pthread_mutex_init(&dev->lock, NULL);
#endif
dev->soapy_dev = SoapySDRDevice_makeStrArgs(dev_query);
if (!dev->soapy_dev) {
@ -950,7 +946,7 @@ static int sdr_open_soapy(sdr_dev_t **out_dev, char const *dev_query, int verbos
static int soapysdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
size_t buffer_size = buf_num * buf_len;
size_t buffer_size = (size_t)buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buffer_size);
@ -1015,14 +1011,13 @@ static int soapysdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint
sdr_event_t ev = {
.ev = SDR_EV_DATA,
// .sample_rate = dev->sample_rate,
// .center_frequency = dev->center_frequency,
.buf = buffer,
.len = n_read * dev->sample_size,
};
dev->polling = 1;
if (n_read > 0) // prevent a crash in callback
cb(&ev, ctx);
dev->polling = 0;
apply_changes(dev, cb, ctx);
} while (dev->running);
@ -1072,7 +1067,7 @@ int sdr_close(sdr_dev_t *dev)
if (!dev)
return -1;
int ret = -1;
int ret = sdr_stop(dev);
if (dev->rtl_tcp)
ret = rtltcp_close(dev->rtl_tcp);
@ -1087,6 +1082,10 @@ int sdr_close(sdr_dev_t *dev)
ret = rtlsdr_close(dev->rtlsdr_dev);
#endif
#ifdef THREADS
pthread_mutex_destroy(&dev->lock);
#endif
free(dev->dev_info);
free(dev->buffer);
free(dev);
@ -1122,15 +1121,12 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
if (!dev)
return -1;
if (dev->polling) {
dev->center_frequency = freq;
dev->apply_freq = 1;
#ifdef RTLSDR
if (dev->rtlsdr_dev)
rtlsdr_cancel_async(dev->rtlsdr_dev);
#endif
return 0;
#ifdef THREADS
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
#endif
int r = -1;
@ -1147,8 +1143,10 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
#endif
#ifdef RTLSDR
if (dev->rtlsdr_dev)
if (dev->rtlsdr_dev) {
r = rtlsdr_set_center_freq(dev->rtlsdr_dev, freq);
print_logf(LOG_DEBUG, "SDR", "rtlsdr_set_center_freq %u = %d", freq, r);
}
#endif
if (verbose) {
@ -1157,6 +1155,13 @@ int sdr_set_center_freq(sdr_dev_t *dev, uint32_t freq, int verbose)
else
print_logf(LOG_NOTICE, "SDR", "Tuned to %s.", nice_freq(sdr_get_center_freq(dev)));
}
#ifdef THREADS
pthread_mutex_lock(&dev->lock);
dev->center_frequency = freq;
pthread_mutex_unlock(&dev->lock);
#endif
return r;
}
@ -1186,15 +1191,12 @@ int sdr_set_freq_correction(sdr_dev_t *dev, int ppm, int verbose)
if (!dev)
return -1;
if (dev->polling) {
dev->freq_correction = ppm;
dev->apply_corr = 1;
#ifdef RTLSDR
if (dev->rtlsdr_dev)
rtlsdr_cancel_async(dev->rtlsdr_dev);
#endif
return 0;
#ifdef THREADS
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
#endif
int r = -1;
@ -1228,16 +1230,12 @@ int sdr_set_auto_gain(sdr_dev_t *dev, int verbose)
if (!dev)
return -1;
if (dev->polling) {
free(dev->gain_str);
dev->gain_str = NULL; // auto gain
dev->apply_gain = 1;
#ifdef RTLSDR
if (dev->rtlsdr_dev)
rtlsdr_cancel_async(dev->rtlsdr_dev);
#endif
return 0;
#ifdef THREADS
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
#endif
int r = -1;
@ -1268,23 +1266,12 @@ int sdr_set_tuner_gain(sdr_dev_t *dev, char const *gain_str, int verbose)
if (!dev)
return -1;
if (dev->polling) {
free(dev->gain_str);
if (!gain_str) {
dev->gain_str = NULL; // auto gain
}
else {
dev->gain_str = strdup(gain_str);
if (!dev->gain_str)
WARN_STRDUP("set_gain_str()");
}
dev->apply_gain = 1;
#ifdef RTLSDR
if (dev->rtlsdr_dev)
rtlsdr_cancel_async(dev->rtlsdr_dev);
#endif
return 0;
#ifdef THREADS
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
#endif
int r = -1;
@ -1370,15 +1357,12 @@ int sdr_set_sample_rate(sdr_dev_t *dev, uint32_t rate, int verbose)
if (!dev)
return -1;
if (dev->polling) {
dev->sample_rate = rate;
dev->apply_rate = 1;
#ifdef RTLSDR
if (dev->rtlsdr_dev)
rtlsdr_cancel_async(dev->rtlsdr_dev);
#endif
return 0;
#ifdef THREADS
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
#endif
int r = -1;
@ -1403,6 +1387,13 @@ int sdr_set_sample_rate(sdr_dev_t *dev, uint32_t rate, int verbose)
else
print_logf(LOG_NOTICE, "SDR", "Sample rate set to %u S/s.", sdr_get_sample_rate(dev)); // Unfortunately, doesn't return real rate
}
#ifdef THREADS
pthread_mutex_lock(&dev->lock);
dev->sample_rate = rate;
pthread_mutex_unlock(&dev->lock);
#endif
return r;
}
@ -1594,7 +1585,7 @@ int sdr_reset(sdr_dev_t *dev, int verbose)
return r;
}
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
int sdr_start_sync(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
if (!dev)
return -1;
@ -1620,7 +1611,7 @@ int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, ui
return -1;
}
int sdr_stop(sdr_dev_t *dev)
int sdr_stop_sync(sdr_dev_t *dev)
{
if (!dev)
return -1;
@ -1661,3 +1652,98 @@ void sdr_redirect_logging(void)
SoapySDR_registerLogHandler(soapysdr_log_handler);
#endif
}
/* threading */
#ifdef THREADS
static THREAD_RETURN THREAD_CALL acquire_thread(void *arg)
{
sdr_dev_t *dev = arg;
print_log(LOG_DEBUG, __func__, "acquire_thread enter...");
int r = sdr_start_sync(dev, dev->async_cb, dev->async_ctx, dev->buf_num, dev->buf_len);
// if (cfg->verbosity > 1)
print_log(LOG_DEBUG, __func__, "acquire_thread async stop...");
if (r < 0) {
print_logf(LOG_ERROR, "SDR", "async read failed (%i).", r);
}
// sdr_event_t ev = {
// .ev = SDR_EV_QUIT,
// };
// dev->async_cb(&ev, dev->async_ctx);
print_log(LOG_DEBUG, __func__, "acquire_thread done...");
return (THREAD_RETURN)(intptr_t)r;
}
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t async_cb, void *async_ctx, uint32_t buf_num, uint32_t buf_len)
{
if (!dev)
return -1;
dev->async_cb = async_cb;
dev->async_ctx = async_ctx;
dev->buf_num = buf_num;
dev->buf_len = buf_len;
#ifndef _WIN32
// Block all signals from the worker thread
sigset_t sigset;
sigset_t oldset;
sigfillset(&sigset);
pthread_sigmask(SIG_SETMASK, &sigset, &oldset);
#endif
int r = pthread_create(&dev->thread, NULL, acquire_thread, dev);
#ifndef _WIN32
pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
if (r) {
fprintf(stderr, "%s: error in pthread_create, rc: %d\n", __func__, r);
}
return r;
}
int sdr_stop(sdr_dev_t *dev)
{
if (!dev)
return -1;
if (pthread_equal(dev->thread, pthread_self())) {
fprintf(stderr, "%s: must not be called from acquire callback!\n", __func__);
return -1;
}
print_log(LOG_DEBUG, __func__, "EXITING...");
pthread_mutex_lock(&dev->lock);
if (dev->exit_acquire) {
pthread_mutex_unlock(&dev->lock);
print_log(LOG_DEBUG, __func__, "Already exiting.");
return 0;
}
dev->exit_acquire = 1; // for rtl_tcp and SoapySDR
sdr_stop_sync(dev); // for rtlsdr
pthread_mutex_unlock(&dev->lock);
print_log(LOG_DEBUG, __func__, "JOINING...");
int r = pthread_join(dev->thread, NULL);
if (r) {
fprintf(stderr, "%s: error in pthread_join, rc: %d\n", __func__, r);
}
print_log(LOG_DEBUG, __func__, "EXITED.");
return r;
}
#else
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
UNUSED(dev);
return -1;
}
int sdr_stop(sdr_dev_t *dev)
{
UNUSED(dev);
return -1;
}
#endif