From 8a203a75afe1cfb4d8ff5a9d37d266186f4c7c83 Mon Sep 17 00:00:00 2001
From: "Christian W. Zuckschwerdt" <zany@triq.net>
Date: Mon, 1 Feb 2021 10:40:28 +0100
Subject: [PATCH] Add GPSd tags option (#1636)

---
 include/data_tag.h |  11 +-
 include/optparse.h |   8 ++
 src/data.c         |  17 ++-
 src/data_tag.c     | 284 +++++++++++++++++++++++++++++++++++++++++++--
 src/optparse.c     |  17 +++
 src/output_mqtt.c  |   2 +-
 src/r_api.c        |   9 +-
 src/rtl_433.c      |  17 ++-
 8 files changed, 346 insertions(+), 19 deletions(-)

diff --git a/include/data_tag.h b/include/data_tag.h
index 5a467043..9a7c1505 100644
--- a/include/data_tag.h
+++ b/include/data_tag.h
@@ -12,16 +12,19 @@
 #ifndef INCLUDE_TAGS_H_
 #define INCLUDE_TAGS_H_
 
+struct gpsd_client;
+struct mg_mgr;
 struct data;
 
 typedef struct data_tag {
-    char *key;
-    char *val;
-    int prepend;
+    char const *key;
+    char const *val;
+    char const **includes;
+    struct gpsd_client *gpsd_client;
 } data_tag_t;
 
 /// Create a data tag. Might fail and return NULL.
-data_tag_t *data_tag_create(char *params);
+data_tag_t *data_tag_create(char *params, struct mg_mgr *mgr);
 
 /// Free a data tag.
 void data_tag_free(data_tag_t *tag);
diff --git a/include/optparse.h b/include/optparse.h
index d313e62f..d0b0e144 100644
--- a/include/optparse.h
+++ b/include/optparse.h
@@ -116,6 +116,14 @@ int atoi_time(char const *str, char const *error_hint);
 /// @return the original value of *stringp
 char *asepc(char **stringp, char delim);
 
+/// Similar to strsep but bounded by a stop character.
+///
+/// @param[in,out] stringp String to parse inplace
+/// @param delim the delimiter character
+/// @param stop the bounding character at which to stop
+/// @return the original value of *stringp
+char *asepcb(char **stringp, char delim, char stop);
+
 /// Parse a comma-separated list of key/value pairs into kwargs.
 ///
 /// The input string will be modified and the pointer advanced.
diff --git a/src/data.c b/src/data.c
index b87b08d7..4b1ccdd3 100644
--- a/src/data.c
+++ b/src/data.c
@@ -493,6 +493,14 @@ static void print_json_data(data_output_t *output, data_t *data, char const *for
 static void print_json_string(data_output_t *output, const char *str, char const *format)
 {
     UNUSED(format);
+
+    size_t str_len = strlen(str);
+    if (str[0] == '{' && str[str_len - 1] == '}') {
+        // Print embedded JSON object verbatim
+        fprintf(output->file, "%s", str);
+        return;
+    }
+
     fprintf(output->file, "\"");
     while (*str) {
         if (*str == '"' || *str == '\\')
@@ -972,7 +980,14 @@ static void format_jsons_string(data_output_t *output, const char *str, char con
     char *buf   = jsons->msg.tail;
     size_t size = jsons->msg.left;
 
-    if (size < strlen(str) + 3) {
+    size_t str_len = strlen(str);
+    if (size < str_len + 3) {
+        return;
+    }
+
+    if (str[0] == '{' && str[str_len - 1] == '}') {
+        // Print embedded JSON object verbatim
+        abuf_cat(&jsons->msg, str);
         return;
     }
 
diff --git a/src/data_tag.c b/src/data_tag.c
index 25d797c6..bff8263e 100644
--- a/src/data_tag.c
+++ b/src/data_tag.c
@@ -14,12 +14,144 @@
 #include <string.h>
 
 #include "data_tag.h"
+#include "mongoose.h"
+#include "jsmn.h"
 #include "data.h"
+#include "list.h"
 #include "optparse.h"
 #include "fileformat.h"
 #include "fatal.h"
 
-data_tag_t *data_tag_create(char *param)
+typedef struct gpsd_client {
+    struct mg_connect_opts connect_opts;
+    struct mg_connection *conn;
+    int prev_status;
+    char address[253 + 6 + 1]; // dns max + port
+    char const *init_str;
+    char const *filter_str;
+    char msg[1024]; // GPSd TPV should about 600 bytes
+} gpsd_client_t;
+
+// GPSd JSON mode
+char const watch_json[] = "?WATCH={\"enable\":true,\"json\":true}\n";
+char const filter_json[] = "{\"class\":\"TPV\",";
+// GPSd NMEA mode
+char const watch_nmea[] = "?WATCH={\"enable\":true,\"nmea\":true}\n";
+char const filter_nmea[] = "$GPGGA,";
+
+static void gpsd_client_line(gpsd_client_t *ctx, char *line)
+{
+    if (!ctx->filter_str || strncmp(line, ctx->filter_str, strlen(ctx->filter_str)) == 0) {
+        strncpy(ctx->msg, line, sizeof(ctx->msg) - 1);
+    }
+}
+
+static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr);
+
+static void gpsd_client_event(struct mg_connection *nc, int ev, void *ev_data)
+{
+    // note that while shutting down the ctx is NULL
+    gpsd_client_t *ctx = (gpsd_client_t *)nc->user_data;
+
+    //if (ev != MG_EV_POLL)
+    //    fprintf(stderr, "GPSd user handler got event %d\n", ev);
+
+    switch (ev) {
+    case MG_EV_CONNECT: {
+        int connect_status = *(int *)ev_data;
+        if (connect_status == 0) {
+            // Success
+            fprintf(stderr, "GPSd Connected...\n");
+            if (ctx->init_str && *ctx->init_str) {
+                mg_send(nc, ctx->init_str, strlen(ctx->init_str));
+            }
+        }
+        else {
+            // Error, print only once
+            if (ctx && ctx->prev_status != connect_status)
+                fprintf(stderr, "GPSd connect error: %s\n", strerror(connect_status));
+        }
+        if (ctx)
+            ctx->prev_status = connect_status;
+        break;
+    }
+    case MG_EV_RECV: {
+        // Require a newline
+        struct mbuf *io = &nc->recv_mbuf;
+        // note: we could scan only the last *(int *)ev_data bytes...
+        char *eol = memchr(io->buf, '\n', io->len);
+        if (eol) {
+            size_t len = eol - io->buf + 1;
+            // strip [\r]\n
+            io->buf[len - 1] = '\0';
+            if (len >= 2 && io->buf[len - 2] == '\r') {
+                io->buf[len - 2] = '\0';
+            }
+            gpsd_client_line(ctx, io->buf);
+            mbuf_remove(io, len); // Discard line from recv buffer
+        }
+        break;
+    }
+    case MG_EV_CLOSE:
+        if (!ctx)
+            break; // shutting down
+        if (ctx->prev_status == 0)
+            fprintf(stderr, "GPSd Connection failed...\n");
+        // reconnect
+        gpsd_client_connect(ctx, nc->mgr);
+        break;
+    }
+}
+
+static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr)
+{
+    char const *error_string       = NULL;
+    ctx->connect_opts.error_string = &error_string;
+    ctx->conn                      = mg_connect_opt(mgr, ctx->address, gpsd_client_event, ctx->connect_opts);
+    ctx->connect_opts.error_string = NULL;
+    if (!ctx->conn) {
+        fprintf(stderr, "GPSd connect (%s) failed%s%s\n", ctx->address,
+                error_string ? ": " : "", error_string ? error_string : "");
+    }
+    return ctx->conn;
+}
+
+static gpsd_client_t *gpsd_client_init(char const *host, char const *port, char const *init_str, char const *filter_str, struct mg_mgr *mgr)
+{
+    gpsd_client_t *ctx;
+    ctx = calloc(1, sizeof(gpsd_client_t));
+    if (!ctx) {
+        WARN_CALLOC("gpsd_client_init()");
+        return NULL;
+    }
+
+    // if the host is an IPv6 address it needs quoting
+    if (strchr(host, ':'))
+        snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port);
+    else
+        snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port);
+
+    ctx->init_str = init_str;
+    ctx->filter_str = filter_str;
+    ctx->connect_opts.user_data = ctx;
+
+    if (!gpsd_client_connect(ctx, mgr)) {
+        exit(1);
+    }
+
+    return ctx;
+}
+
+static void gpsd_client_free(gpsd_client_t *ctx)
+{
+    if (ctx && ctx->conn) {
+        ctx->conn->user_data = NULL;
+        ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
+    }
+    free(ctx);
+}
+
+data_tag_t *data_tag_create(char *param, struct mg_mgr *mgr)
 {
     data_tag_t *tag;
     tag = calloc(1, sizeof(data_tag_t));
@@ -28,12 +160,70 @@ data_tag_t *data_tag_create(char *param)
         return NULL;
     }
 
-    tag->prepend = 1; // always prepend tag
-    tag->val = param;
-    tag->key = asepc(&tag->val, '=');
-    if (!tag->val) {
-        tag->val = tag->key;
-        tag->key = "tag";
+    char *p = param;
+    asepcb(&p, '=', ','); // look for '=' but stop at ','
+    if (p) {
+        tag->key = param;
+        tag->val = p;
+    } else {
+        tag->val = param;
+    }
+
+    int gpsd_mode = strncmp(tag->val, "gpsd", 4) == 0;
+    if (gpsd_mode || strncmp(tag->val, "tcp:", 4) == 0) {
+        p          = arg_param(tag->val); // strip scheme
+        char *host = gpsd_mode ? "localhost" : NULL;
+        char *port = gpsd_mode ? "2947" : NULL;
+        char *opts = hostport_param(p, &host, &port);
+        list_t includes = {0};
+
+        // default to GPSd JSON
+        char const *mode = gpsd_mode ? "GPSd JSON" : "TCP custom";
+        char const *init_str = gpsd_mode ? watch_json : NULL;
+        char const *filter_str = gpsd_mode ? filter_json : NULL;
+        // parse format options
+        char *key, *val;
+        while (getkwargs(&opts, &key, &val)) {
+            key = remove_ws(key);
+            val = trim_ws(val);
+            if (!key || !*key)
+                continue;
+            else if (!strcasecmp(key, "nmea")) {
+                mode       = "GPSd NMEA";
+                init_str   = watch_nmea;
+                filter_str = filter_nmea;
+            }
+            else if (!strcasecmp(key, "init")) {
+                init_str = val;
+            }
+            else if (!strcasecmp(key, "filter")) {
+                filter_str = val;
+            }
+            else if (val) {
+                fprintf(stderr, "Invalid key \"%s\" option.\n", key);
+                exit(1);
+            }
+            else {
+                list_push(&includes, key);
+            }
+        }
+
+        tag->includes = (char const **)includes.elems;
+        if (!tag->key && !tag->includes)
+            tag->key = gpsd_mode ? "gps" : "tag";
+
+        if (!host || !port) {
+            fprintf(stderr, "Host or port for tag client missing!\n");
+            exit(1);
+        }
+
+        fprintf(stderr, "Getting %s data from %s port %s\n", mode, host, port);
+
+        tag->gpsd_client = gpsd_client_init(host, port, init_str, filter_str, mgr);
+    }
+    else {
+        if (!tag->key)
+            tag->key = "tag";
     }
 
     return tag; // NOTE: returns NULL on alloc failure.
@@ -41,18 +231,96 @@ data_tag_t *data_tag_create(char *param)
 
 void data_tag_free(data_tag_t *tag)
 {
+    free(tag->includes);
+    gpsd_client_free(tag->gpsd_client);
+
     free(tag);
 }
 
+static char const *find_list_strncmp(char const **list, char const *key, size_t len)
+{
+    for (; list && *list; ++list) {
+        char const *elem = *list;
+        if (elem && strncmp(elem, key, len) == 0) {
+            return elem;
+        }
+    }
+    return NULL;
+}
+
+#define MAX_JSON_TOKENS 128
+
+static data_t *append_filtered_json(data_t *data, char const *json, char const **includes)
+{
+    jsmn_parser parser = {0};
+    jsmn_init(&parser);
+    jsmntok_t tok[MAX_JSON_TOKENS] = {0};
+
+    int toks = jsmn_parse(&parser, json, strlen(json), tok, MAX_JSON_TOKENS);
+    if (toks < 1 || tok[0].type != JSMN_OBJECT) {
+        fprintf(stderr, "invalid json (%d): %s\n", toks, json);
+        return data; // invalid json
+    }
+
+    // check all tokens
+    char buf[1024] = {0};
+    for (int i = 1; i < toks - 1; ++i) {
+        jsmntok_t *k = tok + i;
+        jsmntok_t *v = tok + i + 1;
+        i += k->size + v->size;
+        //fprintf(stderr, "TOK (%d %d) %.*s : (%d %d) %.*s\n",
+        //        k->type, k->size, k->end - k->start, json + k->start,
+        //        v->type, v->size, v->end - v->start, json + v->start);
+
+        // check all includes
+        char const *key = find_list_strncmp(includes, json + k->start, k->end - k->start);
+        if (key) {
+            // append json tag
+            strncpy(buf, json + v->start, v->end - v->start);
+            data = data_append(data,
+                    key, "", DATA_STRING, buf,
+                    NULL);
+        }
+    }
+
+    return data;
+}
+
 data_t *data_tag_apply(data_tag_t *tag, data_t *data, char const *filename)
 {
     char const *val = tag->val;
-    if (filename && !strcmp("PATH", tag->val)) {
+    if (tag->gpsd_client) {
+        val = tag->gpsd_client->msg;
+        if (tag->includes) {
+            if (tag->key) {
+                // wrap tag includes
+                data_t *obj = append_filtered_json(NULL, val, tag->includes);
+                // append tag wrapper
+                data = data_append(data,
+                        tag->key, "", DATA_DATA, obj,
+                        NULL);
+            }
+            else {
+                // append tag includes
+                data = append_filtered_json(data, val, tag->includes);
+            }
+        }
+        else {
+            // append tag string
+            data = data_append(data,
+                    tag->key, "", DATA_STRING, val,
+                    NULL);
+        }
+        return data;
+    }
+    else if (filename && !strcmp("PATH", tag->val)) {
         val = filename;
     }
     else if (filename && !strcmp("FILE", tag->val)) {
         val = file_basename(filename);
     }
+
+    // prepend simple tags
     data = data_prepend(data,
             tag->key, "", DATA_STRING, val,
             NULL);
diff --git a/src/optparse.c b/src/optparse.c
index 0b39fd4b..6f75d3a3 100644
--- a/src/optparse.c
+++ b/src/optparse.c
@@ -306,6 +306,23 @@ char *asepc(char **stringp, char delim)
     return p;
 }
 
+static char *achrb(char const *s, int c, int b)
+{
+    for (; s && *s && *s != b; ++s)
+        if (*s == c) return (char *)s;
+    return NULL;
+}
+
+char *asepcb(char **stringp, char delim, char stop)
+{
+    if (!stringp || !*stringp) return NULL;
+    char *s = achrb(*stringp, delim, stop);
+    if (s) *s++ = '\0';
+    char *p = *stringp;
+    *stringp = s;
+    return p;
+}
+
 char *getkwargs(char **s, char **key, char **val)
 {
     char *v = asepc(s, ',');
diff --git a/src/output_mqtt.c b/src/output_mqtt.c
index 1a0f909a..2b1d6417 100644
--- a/src/output_mqtt.c
+++ b/src/output_mqtt.c
@@ -362,7 +362,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for
 
         // "events" topic
         if (mqtt->events) {
-            char message[1024]; // we expect the biggest strings to be around 500 bytes.
+            char message[2048]; // we expect the biggest strings to be around 500 bytes.
             data_print_jsons(data, message, sizeof(message));
             expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
             mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
diff --git a/src/r_api.c b/src/r_api.c
index 137bc337..64a60f9b 100644
--- a/src/r_api.c
+++ b/src/r_api.c
@@ -342,7 +342,12 @@ char const **well_known_output_fields(r_cfg_t *cfg)
 
     for (void **iter = cfg->data_tags.elems; iter && *iter; ++iter) {
         data_tag_t *tag = *iter;
-        list_push(&field_list, tag->key);
+        if (tag->key) {
+            list_push(&field_list, (void *)tag->key);
+        }
+        else {
+            list_push_all(&field_list, (void **)tag->includes);
+        }
     }
 
     if (cfg->report_protocol)
@@ -1027,5 +1032,5 @@ void add_infile(r_cfg_t *cfg, char *in_file)
 
 void add_data_tag(struct r_cfg *cfg, char *param)
 {
-    list_push(&cfg->data_tags, data_tag_create(param));
+    list_push(&cfg->data_tags, data_tag_create(param, get_mgr(cfg)));
 }
diff --git a/src/rtl_433.c b/src/rtl_433.c
index c9ea56e7..6dc87556 100644
--- a/src/rtl_433.c
+++ b/src/rtl_433.c
@@ -134,7 +134,7 @@ static void usage(int exit_code)
             "       Append output to file with :<filename> (e.g. -F csv:log.csv), defaults to stdout.\n"
             "       Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n"
             "  [-M time[:<options>] | protocol | level | stats | bits | help] Add various meta data to each output.\n"
-            "  [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n"
+            "  [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n"
             "  [-C native | si | customary] Convert units in decoded output.\n"
             "  [-T <seconds>] Specify number of seconds to run, also 12:34 or 1h23m45s\n"
             "  [-E hop | quit] Hop/Quit after outputting successful event(s)\n"
@@ -231,8 +231,19 @@ static void help_tags(void)
 {
     term_help_printf(
             "\t\t= Data tags option =\n"
-            "  [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n"
-            "\tIf <tag> or <value> is \"FILE\" or \"PATH\" an expanded token will be added.\n");
+            "  [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n"
+            "\tIf <tag> is \"FILE\" or \"PATH\" an expanded token will be added.\n"
+            "\tThe <tag> can also be a GPSd URL, e.g.\n"
+            "\t\t\"-K gpsd,lat,lon\" (report lat and lon keys from local gpsd)\n"
+            "\t\t\"-K loc=gpsd,lat,lon\" (report lat and lon in loc object)\n"
+            "\t\t\"-K gpsd\" (full json TPV report, in default \"gps\" object)\n"
+            "\t\t\"-K foo=gpsd://127.0.0.1:2947\" (with key and address)\n"
+            "\t\t\"-K bar=gpsd,nmea\" (NMEA deault GPGGA report)\n"
+            "\t\t\"-K rmc=gpsd,nmea,filter='$GPRMC'\" (NMEA GPRMC report)\n"
+            "\tAlso <tag> can be a generic tcp address, e.g.\n"
+            "\t\t\"-K foo=tcp:localhost:4000\" (read lines as TCP client)\n"
+            "\t\t\"-K bar=tcp://127.0.0.1:3000,init='subscribe tags\\r\\n'\"\n"
+            "\t\t\"-K baz=tcp://127.0.0.1:5000,filter='a prefix to match'\"\n");
     exit(0);
 }