Refactor output poll to mg_mgr

This commit is contained in:
Christian W. Zuckschwerdt 2020-10-25 11:50:46 +01:00
parent 46d0ca4e75
commit c33ee8ff29
9 changed files with 42 additions and 59 deletions

View file

@ -137,7 +137,6 @@ typedef struct data_output {
void (*print_double)(struct data_output *output, double data, char const *format);
void (*print_int)(struct data_output *output, int data, char const *format);
void (*output_start)(struct data_output *output, const char **fields, int num_fields);
void (*output_poll)(struct data_output *output);
void (*output_free)(struct data_output *output);
FILE *file;
} data_output_t;
@ -168,9 +167,6 @@ void data_output_start(struct data_output *output, const char **fields, int num_
/** Prints a structured data object. */
void data_output_print(struct data_output *output, data_t *data);
/** Allows to polls an event loop, if necessary. */
void data_output_poll(struct data_output *output);
void data_output_free(struct data_output *output);
/* data output helpers */

View file

@ -16,6 +16,8 @@
#include "data.h"
struct data_output *data_output_influx_create(char *opts);
struct mg_mgr;
struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts);
#endif /* INCLUDE_OUTPUT_INFLUX_H_ */

View file

@ -14,6 +14,8 @@
#include "data.h"
struct data_output *data_output_mqtt_create(char const *host, char const *port, char *opts, char const *dev_hint);
struct mg_mgr;
struct data_output *data_output_mqtt_create(struct mg_mgr *mgr, char const *host, char const *port, char *opts, char const *dev_hint);
#endif /* INCLUDE_OUTPUT_MQTT_H_ */

View file

@ -25,6 +25,7 @@
struct sdr_dev;
struct r_device;
struct mg_mgr;
typedef enum {
CONVERT_NATIVE,
@ -97,6 +98,7 @@ typedef struct r_cfg {
unsigned frames_count; ///< stats counter for interval
unsigned frames_fsk; ///< stats counter for interval
unsigned frames_events; ///< stats counter for interval
struct mg_mgr *mgr;
} r_cfg_t;
#endif /* INCLUDE_RTL_433_H_ */

View file

@ -411,13 +411,6 @@ void data_output_start(struct data_output *output, const char **fields, int num_
output->output_start(output, fields, num_fields);
}
void data_output_poll(struct data_output *output)
{
if (!output || !output->output_poll)
return;
output->output_poll(output);
}
void data_output_free(data_output_t *output)
{
if (!output)

View file

@ -363,16 +363,6 @@ static void print_influx_int(data_output_t *output, int data, char const *format
mbuf_snprintf(buf, "%d", data);
}
static void data_output_influx_poll(data_output_t *output)
{
influx_client_t *influx = (influx_client_t *)output;
if (!influx)
return;
mg_mgr_poll(influx->mgr, 0);
}
static void data_output_influx_free(data_output_t *output)
{
influx_client_t *influx = (influx_client_t *)output;
@ -380,13 +370,16 @@ static void data_output_influx_free(data_output_t *output)
if (!influx)
return;
mg_mgr_free(influx->mgr);
free(influx->mgr);
// remove ctx from our connections
for (struct mg_connection *c = mg_next(influx->mgr, NULL); c != NULL; c = mg_next(influx->mgr, c)) {
if (c->user_data == influx)
c->user_data = NULL;
}
free(influx);
}
struct data_output *data_output_influx_create(char *opts)
struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
{
influx_client_t *influx = calloc(1, sizeof(influx_client_t));
if (!influx) {
@ -448,17 +441,11 @@ struct data_output *data_output_influx_create(char *opts)
influx->output.print_string = print_influx_string;
influx->output.print_double = print_influx_double;
influx->output.print_int = print_influx_int;
influx->output.output_poll = data_output_influx_poll;
influx->output.output_free = data_output_influx_free;
fprintf(stderr, "Publishing data to InfluxDB (%s)\n", url);
// TODO: this could be a global mgr
influx->mgr = calloc(1, sizeof(*influx->mgr));
if (!influx->mgr)
FATAL_CALLOC("data_output_influx_create()");
mg_mgr_init(influx->mgr, NULL);
influx->mgr = mgr;
influx_client_init(influx, url, token);
return &influx->output;

View file

@ -138,6 +138,8 @@ static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char cons
static void mqtt_client_free(mqtt_client_t *ctx)
{
if (ctx && ctx->conn)
ctx->conn->user_data = NULL;
free(ctx);
}
@ -157,7 +159,6 @@ static char *mqtt_sanitize_topic(char *topic)
typedef struct {
struct data_output output;
struct mg_mgr *mgr;
mqtt_client_t *mqc;
char topic[256];
char hostname[64];
@ -400,16 +401,6 @@ static void print_mqtt_int(data_output_t *output, int data, char const *format)
print_mqtt_string(output, str, format);
}
static void data_output_mqtt_poll(data_output_t *output)
{
data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
if (!mqtt)
return;
mg_mgr_poll(mqtt->mgr, 0);
}
static void data_output_mqtt_free(data_output_t *output)
{
data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
@ -425,8 +416,6 @@ static void data_output_mqtt_free(data_output_t *output)
mqtt_client_free(mqtt->mqc);
mg_mgr_free(mqtt->mgr);
free(mqtt->mgr);
free(mqtt);
}
@ -451,7 +440,7 @@ static char *mqtt_topic_default(char const *topic, char const *base, char const
return ret;
}
struct data_output *data_output_mqtt_create(char const *host, char const *port, char *opts, char const *dev_hint)
struct data_output *data_output_mqtt_create(struct mg_mgr *mgr, char const *host, char const *port, char *opts, char const *dev_hint)
{
data_output_mqtt_t *mqtt = calloc(1, sizeof(data_output_mqtt_t));
if (!mqtt)
@ -545,16 +534,9 @@ struct data_output *data_output_mqtt_create(char const *host, char const *port,
mqtt->output.print_string = print_mqtt_string;
mqtt->output.print_double = print_mqtt_double;
mqtt->output.print_int = print_mqtt_int;
mqtt->output.output_poll = data_output_mqtt_poll;
mqtt->output.output_free = data_output_mqtt_free;
// TODO: this could be a global mgr
mqtt->mgr = calloc(1, sizeof(*mqtt->mgr));
if (!mqtt->mgr)
FATAL_CALLOC("data_output_mqtt_create()");
mg_mgr_init(mqtt->mgr, NULL);
mqtt->mqc = mqtt_client_init(mqtt->mgr, host, port, user, pass, client_id, retain);
mqtt->mqc = mqtt_client_init(mgr, host, port, user, pass, client_id, retain);
return &mqtt->output;
}

View file

@ -30,6 +30,7 @@
#include "output_mqtt.h"
#include "output_influx.h"
#include "write_sigrok.h"
#include "mongoose.h"
#include "compat_time.h"
#include "fatal.h"
@ -74,6 +75,20 @@ char const *version_string(void)
;
}
/* helper */
static struct mg_mgr *get_mgr(r_cfg_t *cfg)
{
if (!cfg->mgr) {
cfg->mgr = calloc(1, sizeof(*cfg->mgr));
if (!cfg->mgr)
FATAL_CALLOC("get_mgr()");
mg_mgr_init(cfg->mgr, NULL);
}
return cfg->mgr;
}
/* general */
void r_init_cfg(r_cfg_t *cfg)
@ -136,6 +151,9 @@ void r_free_cfg(r_cfg_t *cfg)
list_free_elems(&cfg->in_files, NULL);
mg_mgr_free(cfg->mgr);
free(cfg->mgr);
//free(cfg);
}
@ -841,12 +859,12 @@ void add_mqtt_output(r_cfg_t *cfg, char *param)
char *opts = hostport_param(param, &host, &port);
fprintf(stderr, "Publishing MQTT data to %s port %s\n", host, port);
list_push(&cfg->output_handler, data_output_mqtt_create(host, port, opts, cfg->dev_query));
list_push(&cfg->output_handler, data_output_mqtt_create(get_mgr(cfg), host, port, opts, cfg->dev_query));
}
void add_influx_output(r_cfg_t *cfg, char *param)
{
list_push(&cfg->output_handler, data_output_influx_create(param));
list_push(&cfg->output_handler, data_output_influx_create(get_mgr(cfg), param));
}
void add_syslog_output(r_cfg_t *cfg, char *param)

View file

@ -51,6 +51,7 @@
#include "compat_paths.h"
#include "fatal.h"
#include "write_sigrok.h"
#include "mongoose.h"
#ifdef _WIN32
#include <io.h>
@ -296,8 +297,8 @@ static void sdr_callback(unsigned char *iq_buf, uint32_t len, void *ctx)
char time_str[LOCAL_TIME_BUFLEN];
unsigned long n_samples;
for (size_t i = 0; i < cfg->output_handler.len; ++i) { // list might contain NULLs
data_output_poll(cfg->output_handler.elems[i]);
if (cfg->mgr) {
mg_mgr_poll(cfg->mgr, 0);
}
if (cfg->do_exit || cfg->do_exit_async)