Refactor MQTT and Influx output context from mgr to conn

This commit is contained in:
Christian W. Zuckschwerdt 2020-10-24 16:19:32 +02:00
parent b6b93870cf
commit 46d0ca4e75
2 changed files with 47 additions and 52 deletions

View file

@ -45,7 +45,7 @@ static void influx_client_send(influx_client_t *ctx);
static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data)
{
// note that while shutting down the ctx is NULL
influx_client_t *ctx = (influx_client_t *)nc->mgr->user_data;
influx_client_t *ctx = (influx_client_t *)nc->user_data;
struct http_message *hm = (struct http_message *)ev_data;
switch (ev) {
@ -86,24 +86,12 @@ static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data)
}
}
static struct mg_mgr *influx_client_init(influx_client_t *ctx, char const *url, char const *token)
static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token)
{
struct mg_mgr *mgr = calloc(1, sizeof(*mgr));
if (!mgr) {
FATAL_CALLOC("influx_client_init()");
}
strncpy(ctx->url, url, sizeof(ctx->url) - 1);
snprintf(ctx->extra_headers, sizeof (ctx->extra_headers), "Authorization: Token %s\r\n", token);
mg_mgr_init(mgr, ctx);
return mgr;
}
static int influx_client_poll(struct mg_mgr *mgr)
{
return mg_mgr_poll(mgr, 0);
return ctx;
}
static void influx_client_send(influx_client_t *ctx)
@ -117,7 +105,8 @@ static void influx_client_send(influx_client_t *ctx)
if (ctx->transfer_running || !buf->len)
return;
if (mg_connect_http(ctx->mgr, influx_client_event, ctx->url, ctx->extra_headers, buf->buf) == NULL) {
struct mg_connect_opts opts = {.user_data = ctx};
if (mg_connect_http_opt(ctx->mgr, influx_client_event, opts, ctx->url, ctx->extra_headers, buf->buf) == NULL) {
fprintf(stderr, "Connect to InfluxDB (%s) failed\n", ctx->url);
}
else {
@ -381,7 +370,7 @@ static void data_output_influx_poll(data_output_t *output)
if (!influx)
return;
influx_client_poll(influx->mgr);
mg_mgr_poll(influx->mgr, 0);
}
static void data_output_influx_free(data_output_t *output)
@ -391,8 +380,9 @@ static void data_output_influx_free(data_output_t *output)
if (!influx)
return;
influx->mgr->user_data = NULL;
mg_mgr_free(influx->mgr);
free(influx->mgr);
free(influx);
}
@ -463,7 +453,13 @@ struct data_output *data_output_influx_create(char *opts)
fprintf(stderr, "Publishing data to InfluxDB (%s)\n", url);
influx->mgr = influx_client_init(influx, url, token);
// TODO: this could be a global mgr
influx->mgr = calloc(1, sizeof(*influx->mgr));
if (!influx->mgr)
FATAL_CALLOC("data_output_influx_create()");
mg_mgr_init(influx->mgr, NULL);
influx_client_init(influx, url, token);
return &influx->output;
}

View file

@ -25,6 +25,7 @@
typedef struct mqtt_client {
struct mg_send_mqtt_handshake_opts opts;
struct mg_connection *conn;
int prev_status;
char address[253 + 6 + 1]; // dns max + port
char client_id[256];
@ -35,7 +36,7 @@ typedef struct mqtt_client {
static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
{
// note that while shutting down the ctx is NULL
mqtt_client_t *ctx = (mqtt_client_t *)nc->mgr->user_data;
mqtt_client_t *ctx = (mqtt_client_t *)nc->user_data;
// only valid in MG_EV_MQTT_ events
struct mg_mqtt_message *msg = (struct mg_mqtt_message *)ev_data;
@ -86,19 +87,17 @@ static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data)
if (ctx->prev_status == 0)
fprintf(stderr, "MQTT Connection failed...\n");
// reconnect
if (mg_connect(nc->mgr, ctx->address, mqtt_client_event) == NULL) {
struct mg_connect_opts opts = {.user_data = ctx};
ctx->conn = mg_connect_opt(nc->mgr, ctx->address, mqtt_client_event, opts);
if (!ctx->conn) {
fprintf(stderr, "MQTT connect(%s) failed\n", ctx->address);
}
break;
}
}
static struct mg_mgr *mqtt_client_init(char const *host, char const *port, char const *user, char const *pass, char const *client_id, int retain)
static mqtt_client_t *mqtt_client_init(struct mg_mgr *mgr, char const *host, char const *port, char const *user, char const *pass, char const *client_id, int retain)
{
struct mg_mgr *mgr = calloc(1, sizeof(*mgr));
if (!mgr)
FATAL_CALLOC("mqtt_client_init()");
mqtt_client_t *ctx = calloc(1, sizeof(*ctx));
if (!ctx)
FATAL_CALLOC("mqtt_client_init()");
@ -112,44 +111,34 @@ static struct mg_mgr *mqtt_client_init(char const *host, char const *port, char
//ctx->cleansession = 1;
strncpy(ctx->client_id, client_id, sizeof(ctx->client_id));
mg_mgr_init(mgr, ctx);
// if the host is an IPv6 address it needs quoting
if (strchr(host, ':'))
snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port);
else
snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port);
if (mg_connect(mgr, ctx->address, mqtt_client_event) == NULL) {
struct mg_connect_opts opts = {.user_data = ctx};
ctx->conn = mg_connect_opt(mgr, ctx->address, mqtt_client_event, opts);
if (!ctx->conn) {
fprintf(stderr, "MQTT connect(%s) failed\n", ctx->address);
exit(1);
}
return mgr;
return ctx;
}
static int mqtt_client_poll(struct mg_mgr *mgr)
static void mqtt_client_publish(mqtt_client_t *ctx, char const *topic, char const *str)
{
return mg_mgr_poll(mgr, 0);
}
if (!ctx->conn || !ctx->conn->proto_handler)
return;
static void mqtt_client_publish(struct mg_mgr *mgr, char const *topic, char const *str)
{
mqtt_client_t *ctx = (mqtt_client_t *)mgr->user_data;
ctx->message_id++;
for (struct mg_connection *c = mg_next(mgr, NULL); c != NULL; c = mg_next(mgr, c)) {
if (c->proto_handler)
mg_mqtt_publish(c, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
}
mg_mqtt_publish(ctx->conn, topic, ctx->message_id, ctx->publish_flags, str, strlen(str));
}
static void mqtt_client_free(struct mg_mgr *mgr)
static void mqtt_client_free(mqtt_client_t *ctx)
{
free(mgr->user_data);
mgr->user_data = NULL;
mg_mgr_free(mgr);
free(mgr);
free(ctx);
}
/* Helper */
@ -169,6 +158,7 @@ static char *mqtt_sanitize_topic(char *topic)
typedef struct {
struct data_output output;
struct mg_mgr *mgr;
mqtt_client_t *mqc;
char topic[256];
char hostname[64];
char *devices;
@ -335,7 +325,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for
}
data_print_jsons(data, message, message_size);
expand_topic(mqtt->topic, mqtt->states, data, mqtt->hostname);
mqtt_client_publish(mqtt->mgr, mqtt->topic, message);
mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
*mqtt->topic = '\0'; // clear topic
free(message);
}
@ -347,7 +337,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for
char message[1024]; // we expect the biggest strings to be around 500 bytes.
data_print_jsons(data, message, sizeof(message));
expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
mqtt_client_publish(mqtt->mgr, mqtt->topic, message);
mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
*mqtt->topic = '\0'; // clear topic
}
@ -381,7 +371,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for
static void print_mqtt_string(data_output_t *output, char const *str, char const *format)
{
data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output;
mqtt_client_publish(mqtt->mgr, mqtt->topic, str);
mqtt_client_publish(mqtt->mqc, mqtt->topic, str);
}
static void print_mqtt_double(data_output_t *output, double data, char const *format)
@ -417,7 +407,7 @@ static void data_output_mqtt_poll(data_output_t *output)
if (!mqtt)
return;
mqtt_client_poll(mqtt->mgr);
mg_mgr_poll(mqtt->mgr, 0);
}
static void data_output_mqtt_free(data_output_t *output)
@ -433,7 +423,10 @@ static void data_output_mqtt_free(data_output_t *output)
//free(mqtt->homie);
//free(mqtt->hass);
mqtt_client_free(mqtt->mgr);
mqtt_client_free(mqtt->mqc);
mg_mgr_free(mqtt->mgr);
free(mqtt->mgr);
free(mqtt);
}
@ -555,7 +548,13 @@ struct data_output *data_output_mqtt_create(char const *host, char const *port,
mqtt->output.output_poll = data_output_mqtt_poll;
mqtt->output.output_free = data_output_mqtt_free;
mqtt->mgr = mqtt_client_init(host, port, user, pass, client_id, retain);
// TODO: this could be a global mgr
mqtt->mgr = calloc(1, sizeof(*mqtt->mgr));
if (!mqtt->mgr)
FATAL_CALLOC("data_output_mqtt_create()");
mg_mgr_init(mqtt->mgr, NULL);
mqtt->mqc = mqtt_client_init(mqtt->mgr, host, port, user, pass, client_id, retain);
return &mqtt->output;
}