diff --git a/include/data_tag.h b/include/data_tag.h index 5a467043..9a7c1505 100644 --- a/include/data_tag.h +++ b/include/data_tag.h @@ -12,16 +12,19 @@ #ifndef INCLUDE_TAGS_H_ #define INCLUDE_TAGS_H_ +struct gpsd_client; +struct mg_mgr; struct data; typedef struct data_tag { - char *key; - char *val; - int prepend; + char const *key; + char const *val; + char const **includes; + struct gpsd_client *gpsd_client; } data_tag_t; /// Create a data tag. Might fail and return NULL. -data_tag_t *data_tag_create(char *params); +data_tag_t *data_tag_create(char *params, struct mg_mgr *mgr); /// Free a data tag. void data_tag_free(data_tag_t *tag); diff --git a/include/optparse.h b/include/optparse.h index d313e62f..d0b0e144 100644 --- a/include/optparse.h +++ b/include/optparse.h @@ -116,6 +116,14 @@ int atoi_time(char const *str, char const *error_hint); /// @return the original value of *stringp char *asepc(char **stringp, char delim); +/// Similar to strsep but bounded by a stop character. +/// +/// @param[in,out] stringp String to parse inplace +/// @param delim the delimiter character +/// @param stop the bounding character at which to stop +/// @return the original value of *stringp +char *asepcb(char **stringp, char delim, char stop); + /// Parse a comma-separated list of key/value pairs into kwargs. /// /// The input string will be modified and the pointer advanced. diff --git a/src/data.c b/src/data.c index b87b08d7..4b1ccdd3 100644 --- a/src/data.c +++ b/src/data.c @@ -493,6 +493,14 @@ static void print_json_data(data_output_t *output, data_t *data, char const *for static void print_json_string(data_output_t *output, const char *str, char const *format) { UNUSED(format); + + size_t str_len = strlen(str); + if (str[0] == '{' && str[str_len - 1] == '}') { + // Print embedded JSON object verbatim + fprintf(output->file, "%s", str); + return; + } + fprintf(output->file, "\""); while (*str) { if (*str == '"' || *str == '\\') @@ -972,7 +980,14 @@ static void format_jsons_string(data_output_t *output, const char *str, char con char *buf = jsons->msg.tail; size_t size = jsons->msg.left; - if (size < strlen(str) + 3) { + size_t str_len = strlen(str); + if (size < str_len + 3) { + return; + } + + if (str[0] == '{' && str[str_len - 1] == '}') { + // Print embedded JSON object verbatim + abuf_cat(&jsons->msg, str); return; } diff --git a/src/data_tag.c b/src/data_tag.c index 25d797c6..bff8263e 100644 --- a/src/data_tag.c +++ b/src/data_tag.c @@ -14,12 +14,144 @@ #include <string.h> #include "data_tag.h" +#include "mongoose.h" +#include "jsmn.h" #include "data.h" +#include "list.h" #include "optparse.h" #include "fileformat.h" #include "fatal.h" -data_tag_t *data_tag_create(char *param) +typedef struct gpsd_client { + struct mg_connect_opts connect_opts; + struct mg_connection *conn; + int prev_status; + char address[253 + 6 + 1]; // dns max + port + char const *init_str; + char const *filter_str; + char msg[1024]; // GPSd TPV should about 600 bytes +} gpsd_client_t; + +// GPSd JSON mode +char const watch_json[] = "?WATCH={\"enable\":true,\"json\":true}\n"; +char const filter_json[] = "{\"class\":\"TPV\","; +// GPSd NMEA mode +char const watch_nmea[] = "?WATCH={\"enable\":true,\"nmea\":true}\n"; +char const filter_nmea[] = "$GPGGA,"; + +static void gpsd_client_line(gpsd_client_t *ctx, char *line) +{ + if (!ctx->filter_str || strncmp(line, ctx->filter_str, strlen(ctx->filter_str)) == 0) { + strncpy(ctx->msg, line, sizeof(ctx->msg) - 1); + } +} + +static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr); + +static void gpsd_client_event(struct mg_connection *nc, int ev, void *ev_data) +{ + // note that while shutting down the ctx is NULL + gpsd_client_t *ctx = (gpsd_client_t *)nc->user_data; + + //if (ev != MG_EV_POLL) + // fprintf(stderr, "GPSd user handler got event %d\n", ev); + + switch (ev) { + case MG_EV_CONNECT: { + int connect_status = *(int *)ev_data; + if (connect_status == 0) { + // Success + fprintf(stderr, "GPSd Connected...\n"); + if (ctx->init_str && *ctx->init_str) { + mg_send(nc, ctx->init_str, strlen(ctx->init_str)); + } + } + else { + // Error, print only once + if (ctx && ctx->prev_status != connect_status) + fprintf(stderr, "GPSd connect error: %s\n", strerror(connect_status)); + } + if (ctx) + ctx->prev_status = connect_status; + break; + } + case MG_EV_RECV: { + // Require a newline + struct mbuf *io = &nc->recv_mbuf; + // note: we could scan only the last *(int *)ev_data bytes... + char *eol = memchr(io->buf, '\n', io->len); + if (eol) { + size_t len = eol - io->buf + 1; + // strip [\r]\n + io->buf[len - 1] = '\0'; + if (len >= 2 && io->buf[len - 2] == '\r') { + io->buf[len - 2] = '\0'; + } + gpsd_client_line(ctx, io->buf); + mbuf_remove(io, len); // Discard line from recv buffer + } + break; + } + case MG_EV_CLOSE: + if (!ctx) + break; // shutting down + if (ctx->prev_status == 0) + fprintf(stderr, "GPSd Connection failed...\n"); + // reconnect + gpsd_client_connect(ctx, nc->mgr); + break; + } +} + +static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr) +{ + char const *error_string = NULL; + ctx->connect_opts.error_string = &error_string; + ctx->conn = mg_connect_opt(mgr, ctx->address, gpsd_client_event, ctx->connect_opts); + ctx->connect_opts.error_string = NULL; + if (!ctx->conn) { + fprintf(stderr, "GPSd connect (%s) failed%s%s\n", ctx->address, + error_string ? ": " : "", error_string ? error_string : ""); + } + return ctx->conn; +} + +static gpsd_client_t *gpsd_client_init(char const *host, char const *port, char const *init_str, char const *filter_str, struct mg_mgr *mgr) +{ + gpsd_client_t *ctx; + ctx = calloc(1, sizeof(gpsd_client_t)); + if (!ctx) { + WARN_CALLOC("gpsd_client_init()"); + return NULL; + } + + // if the host is an IPv6 address it needs quoting + if (strchr(host, ':')) + snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port); + else + snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port); + + ctx->init_str = init_str; + ctx->filter_str = filter_str; + ctx->connect_opts.user_data = ctx; + + if (!gpsd_client_connect(ctx, mgr)) { + exit(1); + } + + return ctx; +} + +static void gpsd_client_free(gpsd_client_t *ctx) +{ + if (ctx && ctx->conn) { + ctx->conn->user_data = NULL; + ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY; + } + free(ctx); +} + +data_tag_t *data_tag_create(char *param, struct mg_mgr *mgr) { data_tag_t *tag; tag = calloc(1, sizeof(data_tag_t)); @@ -28,12 +160,70 @@ data_tag_t *data_tag_create(char *param) return NULL; } - tag->prepend = 1; // always prepend tag - tag->val = param; - tag->key = asepc(&tag->val, '='); - if (!tag->val) { - tag->val = tag->key; - tag->key = "tag"; + char *p = param; + asepcb(&p, '=', ','); // look for '=' but stop at ',' + if (p) { + tag->key = param; + tag->val = p; + } else { + tag->val = param; + } + + int gpsd_mode = strncmp(tag->val, "gpsd", 4) == 0; + if (gpsd_mode || strncmp(tag->val, "tcp:", 4) == 0) { + p = arg_param(tag->val); // strip scheme + char *host = gpsd_mode ? "localhost" : NULL; + char *port = gpsd_mode ? "2947" : NULL; + char *opts = hostport_param(p, &host, &port); + list_t includes = {0}; + + // default to GPSd JSON + char const *mode = gpsd_mode ? "GPSd JSON" : "TCP custom"; + char const *init_str = gpsd_mode ? watch_json : NULL; + char const *filter_str = gpsd_mode ? filter_json : NULL; + // parse format options + char *key, *val; + while (getkwargs(&opts, &key, &val)) { + key = remove_ws(key); + val = trim_ws(val); + if (!key || !*key) + continue; + else if (!strcasecmp(key, "nmea")) { + mode = "GPSd NMEA"; + init_str = watch_nmea; + filter_str = filter_nmea; + } + else if (!strcasecmp(key, "init")) { + init_str = val; + } + else if (!strcasecmp(key, "filter")) { + filter_str = val; + } + else if (val) { + fprintf(stderr, "Invalid key \"%s\" option.\n", key); + exit(1); + } + else { + list_push(&includes, key); + } + } + + tag->includes = (char const **)includes.elems; + if (!tag->key && !tag->includes) + tag->key = gpsd_mode ? "gps" : "tag"; + + if (!host || !port) { + fprintf(stderr, "Host or port for tag client missing!\n"); + exit(1); + } + + fprintf(stderr, "Getting %s data from %s port %s\n", mode, host, port); + + tag->gpsd_client = gpsd_client_init(host, port, init_str, filter_str, mgr); + } + else { + if (!tag->key) + tag->key = "tag"; } return tag; // NOTE: returns NULL on alloc failure. @@ -41,18 +231,96 @@ data_tag_t *data_tag_create(char *param) void data_tag_free(data_tag_t *tag) { + free(tag->includes); + gpsd_client_free(tag->gpsd_client); + free(tag); } +static char const *find_list_strncmp(char const **list, char const *key, size_t len) +{ + for (; list && *list; ++list) { + char const *elem = *list; + if (elem && strncmp(elem, key, len) == 0) { + return elem; + } + } + return NULL; +} + +#define MAX_JSON_TOKENS 128 + +static data_t *append_filtered_json(data_t *data, char const *json, char const **includes) +{ + jsmn_parser parser = {0}; + jsmn_init(&parser); + jsmntok_t tok[MAX_JSON_TOKENS] = {0}; + + int toks = jsmn_parse(&parser, json, strlen(json), tok, MAX_JSON_TOKENS); + if (toks < 1 || tok[0].type != JSMN_OBJECT) { + fprintf(stderr, "invalid json (%d): %s\n", toks, json); + return data; // invalid json + } + + // check all tokens + char buf[1024] = {0}; + for (int i = 1; i < toks - 1; ++i) { + jsmntok_t *k = tok + i; + jsmntok_t *v = tok + i + 1; + i += k->size + v->size; + //fprintf(stderr, "TOK (%d %d) %.*s : (%d %d) %.*s\n", + // k->type, k->size, k->end - k->start, json + k->start, + // v->type, v->size, v->end - v->start, json + v->start); + + // check all includes + char const *key = find_list_strncmp(includes, json + k->start, k->end - k->start); + if (key) { + // append json tag + strncpy(buf, json + v->start, v->end - v->start); + data = data_append(data, + key, "", DATA_STRING, buf, + NULL); + } + } + + return data; +} + data_t *data_tag_apply(data_tag_t *tag, data_t *data, char const *filename) { char const *val = tag->val; - if (filename && !strcmp("PATH", tag->val)) { + if (tag->gpsd_client) { + val = tag->gpsd_client->msg; + if (tag->includes) { + if (tag->key) { + // wrap tag includes + data_t *obj = append_filtered_json(NULL, val, tag->includes); + // append tag wrapper + data = data_append(data, + tag->key, "", DATA_DATA, obj, + NULL); + } + else { + // append tag includes + data = append_filtered_json(data, val, tag->includes); + } + } + else { + // append tag string + data = data_append(data, + tag->key, "", DATA_STRING, val, + NULL); + } + return data; + } + else if (filename && !strcmp("PATH", tag->val)) { val = filename; } else if (filename && !strcmp("FILE", tag->val)) { val = file_basename(filename); } + + // prepend simple tags data = data_prepend(data, tag->key, "", DATA_STRING, val, NULL); diff --git a/src/optparse.c b/src/optparse.c index 0b39fd4b..6f75d3a3 100644 --- a/src/optparse.c +++ b/src/optparse.c @@ -306,6 +306,23 @@ char *asepc(char **stringp, char delim) return p; } +static char *achrb(char const *s, int c, int b) +{ + for (; s && *s && *s != b; ++s) + if (*s == c) return (char *)s; + return NULL; +} + +char *asepcb(char **stringp, char delim, char stop) +{ + if (!stringp || !*stringp) return NULL; + char *s = achrb(*stringp, delim, stop); + if (s) *s++ = '\0'; + char *p = *stringp; + *stringp = s; + return p; +} + char *getkwargs(char **s, char **key, char **val) { char *v = asepc(s, ','); diff --git a/src/output_mqtt.c b/src/output_mqtt.c index 1a0f909a..2b1d6417 100644 --- a/src/output_mqtt.c +++ b/src/output_mqtt.c @@ -362,7 +362,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for // "events" topic if (mqtt->events) { - char message[1024]; // we expect the biggest strings to be around 500 bytes. + char message[2048]; // we expect the biggest strings to be around 500 bytes. data_print_jsons(data, message, sizeof(message)); expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname); mqtt_client_publish(mqtt->mqc, mqtt->topic, message); diff --git a/src/r_api.c b/src/r_api.c index 137bc337..64a60f9b 100644 --- a/src/r_api.c +++ b/src/r_api.c @@ -342,7 +342,12 @@ char const **well_known_output_fields(r_cfg_t *cfg) for (void **iter = cfg->data_tags.elems; iter && *iter; ++iter) { data_tag_t *tag = *iter; - list_push(&field_list, tag->key); + if (tag->key) { + list_push(&field_list, (void *)tag->key); + } + else { + list_push_all(&field_list, (void **)tag->includes); + } } if (cfg->report_protocol) @@ -1027,5 +1032,5 @@ void add_infile(r_cfg_t *cfg, char *in_file) void add_data_tag(struct r_cfg *cfg, char *param) { - list_push(&cfg->data_tags, data_tag_create(param)); + list_push(&cfg->data_tags, data_tag_create(param, get_mgr(cfg))); } diff --git a/src/rtl_433.c b/src/rtl_433.c index c9ea56e7..6dc87556 100644 --- a/src/rtl_433.c +++ b/src/rtl_433.c @@ -134,7 +134,7 @@ static void usage(int exit_code) " Append output to file with :<filename> (e.g. -F csv:log.csv), defaults to stdout.\n" " Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n" " [-M time[:<options>] | protocol | level | stats | bits | help] Add various meta data to each output.\n" - " [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n" + " [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n" " [-C native | si | customary] Convert units in decoded output.\n" " [-T <seconds>] Specify number of seconds to run, also 12:34 or 1h23m45s\n" " [-E hop | quit] Hop/Quit after outputting successful event(s)\n" @@ -231,8 +231,19 @@ static void help_tags(void) { term_help_printf( "\t\t= Data tags option =\n" - " [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n" - "\tIf <tag> or <value> is \"FILE\" or \"PATH\" an expanded token will be added.\n"); + " [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n" + "\tIf <tag> is \"FILE\" or \"PATH\" an expanded token will be added.\n" + "\tThe <tag> can also be a GPSd URL, e.g.\n" + "\t\t\"-K gpsd,lat,lon\" (report lat and lon keys from local gpsd)\n" + "\t\t\"-K loc=gpsd,lat,lon\" (report lat and lon in loc object)\n" + "\t\t\"-K gpsd\" (full json TPV report, in default \"gps\" object)\n" + "\t\t\"-K foo=gpsd://127.0.0.1:2947\" (with key and address)\n" + "\t\t\"-K bar=gpsd,nmea\" (NMEA deault GPGGA report)\n" + "\t\t\"-K rmc=gpsd,nmea,filter='$GPRMC'\" (NMEA GPRMC report)\n" + "\tAlso <tag> can be a generic tcp address, e.g.\n" + "\t\t\"-K foo=tcp:localhost:4000\" (read lines as TCP client)\n" + "\t\t\"-K bar=tcp://127.0.0.1:3000,init='subscribe tags\\r\\n'\"\n" + "\t\t\"-K baz=tcp://127.0.0.1:5000,filter='a prefix to match'\"\n"); exit(0); }