Add GPSd tags option ()

This commit is contained in:
Christian W. Zuckschwerdt 2021-02-01 10:40:28 +01:00 committed by GitHub
parent 71e21c43c1
commit 8a203a75af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 346 additions and 19 deletions

View file

@ -12,16 +12,19 @@
#ifndef INCLUDE_TAGS_H_
#define INCLUDE_TAGS_H_
struct gpsd_client;
struct mg_mgr;
struct data;
typedef struct data_tag {
char *key;
char *val;
int prepend;
char const *key;
char const *val;
char const **includes;
struct gpsd_client *gpsd_client;
} data_tag_t;
/// Create a data tag. Might fail and return NULL.
data_tag_t *data_tag_create(char *params);
data_tag_t *data_tag_create(char *params, struct mg_mgr *mgr);
/// Free a data tag.
void data_tag_free(data_tag_t *tag);

View file

@ -116,6 +116,14 @@ int atoi_time(char const *str, char const *error_hint);
/// @return the original value of *stringp
char *asepc(char **stringp, char delim);
/// Similar to strsep but bounded by a stop character.
///
/// @param[in,out] stringp String to parse inplace
/// @param delim the delimiter character
/// @param stop the bounding character at which to stop
/// @return the original value of *stringp
char *asepcb(char **stringp, char delim, char stop);
/// Parse a comma-separated list of key/value pairs into kwargs.
///
/// The input string will be modified and the pointer advanced.

View file

@ -493,6 +493,14 @@ static void print_json_data(data_output_t *output, data_t *data, char const *for
static void print_json_string(data_output_t *output, const char *str, char const *format)
{
UNUSED(format);
size_t str_len = strlen(str);
if (str[0] == '{' && str[str_len - 1] == '}') {
// Print embedded JSON object verbatim
fprintf(output->file, "%s", str);
return;
}
fprintf(output->file, "\"");
while (*str) {
if (*str == '"' || *str == '\\')
@ -972,7 +980,14 @@ static void format_jsons_string(data_output_t *output, const char *str, char con
char *buf = jsons->msg.tail;
size_t size = jsons->msg.left;
if (size < strlen(str) + 3) {
size_t str_len = strlen(str);
if (size < str_len + 3) {
return;
}
if (str[0] == '{' && str[str_len - 1] == '}') {
// Print embedded JSON object verbatim
abuf_cat(&jsons->msg, str);
return;
}

View file

@ -14,12 +14,144 @@
#include <string.h>
#include "data_tag.h"
#include "mongoose.h"
#include "jsmn.h"
#include "data.h"
#include "list.h"
#include "optparse.h"
#include "fileformat.h"
#include "fatal.h"
data_tag_t *data_tag_create(char *param)
typedef struct gpsd_client {
struct mg_connect_opts connect_opts;
struct mg_connection *conn;
int prev_status;
char address[253 + 6 + 1]; // dns max + port
char const *init_str;
char const *filter_str;
char msg[1024]; // GPSd TPV should about 600 bytes
} gpsd_client_t;
// GPSd JSON mode
char const watch_json[] = "?WATCH={\"enable\":true,\"json\":true}\n";
char const filter_json[] = "{\"class\":\"TPV\",";
// GPSd NMEA mode
char const watch_nmea[] = "?WATCH={\"enable\":true,\"nmea\":true}\n";
char const filter_nmea[] = "$GPGGA,";
static void gpsd_client_line(gpsd_client_t *ctx, char *line)
{
if (!ctx->filter_str || strncmp(line, ctx->filter_str, strlen(ctx->filter_str)) == 0) {
strncpy(ctx->msg, line, sizeof(ctx->msg) - 1);
}
}
static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr);
static void gpsd_client_event(struct mg_connection *nc, int ev, void *ev_data)
{
// note that while shutting down the ctx is NULL
gpsd_client_t *ctx = (gpsd_client_t *)nc->user_data;
//if (ev != MG_EV_POLL)
// fprintf(stderr, "GPSd user handler got event %d\n", ev);
switch (ev) {
case MG_EV_CONNECT: {
int connect_status = *(int *)ev_data;
if (connect_status == 0) {
// Success
fprintf(stderr, "GPSd Connected...\n");
if (ctx->init_str && *ctx->init_str) {
mg_send(nc, ctx->init_str, strlen(ctx->init_str));
}
}
else {
// Error, print only once
if (ctx && ctx->prev_status != connect_status)
fprintf(stderr, "GPSd connect error: %s\n", strerror(connect_status));
}
if (ctx)
ctx->prev_status = connect_status;
break;
}
case MG_EV_RECV: {
// Require a newline
struct mbuf *io = &nc->recv_mbuf;
// note: we could scan only the last *(int *)ev_data bytes...
char *eol = memchr(io->buf, '\n', io->len);
if (eol) {
size_t len = eol - io->buf + 1;
// strip [\r]\n
io->buf[len - 1] = '\0';
if (len >= 2 && io->buf[len - 2] == '\r') {
io->buf[len - 2] = '\0';
}
gpsd_client_line(ctx, io->buf);
mbuf_remove(io, len); // Discard line from recv buffer
}
break;
}
case MG_EV_CLOSE:
if (!ctx)
break; // shutting down
if (ctx->prev_status == 0)
fprintf(stderr, "GPSd Connection failed...\n");
// reconnect
gpsd_client_connect(ctx, nc->mgr);
break;
}
}
static struct mg_connection *gpsd_client_connect(gpsd_client_t *ctx, struct mg_mgr *mgr)
{
char const *error_string = NULL;
ctx->connect_opts.error_string = &error_string;
ctx->conn = mg_connect_opt(mgr, ctx->address, gpsd_client_event, ctx->connect_opts);
ctx->connect_opts.error_string = NULL;
if (!ctx->conn) {
fprintf(stderr, "GPSd connect (%s) failed%s%s\n", ctx->address,
error_string ? ": " : "", error_string ? error_string : "");
}
return ctx->conn;
}
static gpsd_client_t *gpsd_client_init(char const *host, char const *port, char const *init_str, char const *filter_str, struct mg_mgr *mgr)
{
gpsd_client_t *ctx;
ctx = calloc(1, sizeof(gpsd_client_t));
if (!ctx) {
WARN_CALLOC("gpsd_client_init()");
return NULL;
}
// if the host is an IPv6 address it needs quoting
if (strchr(host, ':'))
snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port);
else
snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port);
ctx->init_str = init_str;
ctx->filter_str = filter_str;
ctx->connect_opts.user_data = ctx;
if (!gpsd_client_connect(ctx, mgr)) {
exit(1);
}
return ctx;
}
static void gpsd_client_free(gpsd_client_t *ctx)
{
if (ctx && ctx->conn) {
ctx->conn->user_data = NULL;
ctx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
}
free(ctx);
}
data_tag_t *data_tag_create(char *param, struct mg_mgr *mgr)
{
data_tag_t *tag;
tag = calloc(1, sizeof(data_tag_t));
@ -28,12 +160,70 @@ data_tag_t *data_tag_create(char *param)
return NULL;
}
tag->prepend = 1; // always prepend tag
tag->val = param;
tag->key = asepc(&tag->val, '=');
if (!tag->val) {
tag->val = tag->key;
tag->key = "tag";
char *p = param;
asepcb(&p, '=', ','); // look for '=' but stop at ','
if (p) {
tag->key = param;
tag->val = p;
} else {
tag->val = param;
}
int gpsd_mode = strncmp(tag->val, "gpsd", 4) == 0;
if (gpsd_mode || strncmp(tag->val, "tcp:", 4) == 0) {
p = arg_param(tag->val); // strip scheme
char *host = gpsd_mode ? "localhost" : NULL;
char *port = gpsd_mode ? "2947" : NULL;
char *opts = hostport_param(p, &host, &port);
list_t includes = {0};
// default to GPSd JSON
char const *mode = gpsd_mode ? "GPSd JSON" : "TCP custom";
char const *init_str = gpsd_mode ? watch_json : NULL;
char const *filter_str = gpsd_mode ? filter_json : NULL;
// parse format options
char *key, *val;
while (getkwargs(&opts, &key, &val)) {
key = remove_ws(key);
val = trim_ws(val);
if (!key || !*key)
continue;
else if (!strcasecmp(key, "nmea")) {
mode = "GPSd NMEA";
init_str = watch_nmea;
filter_str = filter_nmea;
}
else if (!strcasecmp(key, "init")) {
init_str = val;
}
else if (!strcasecmp(key, "filter")) {
filter_str = val;
}
else if (val) {
fprintf(stderr, "Invalid key \"%s\" option.\n", key);
exit(1);
}
else {
list_push(&includes, key);
}
}
tag->includes = (char const **)includes.elems;
if (!tag->key && !tag->includes)
tag->key = gpsd_mode ? "gps" : "tag";
if (!host || !port) {
fprintf(stderr, "Host or port for tag client missing!\n");
exit(1);
}
fprintf(stderr, "Getting %s data from %s port %s\n", mode, host, port);
tag->gpsd_client = gpsd_client_init(host, port, init_str, filter_str, mgr);
}
else {
if (!tag->key)
tag->key = "tag";
}
return tag; // NOTE: returns NULL on alloc failure.
@ -41,18 +231,96 @@ data_tag_t *data_tag_create(char *param)
void data_tag_free(data_tag_t *tag)
{
free(tag->includes);
gpsd_client_free(tag->gpsd_client);
free(tag);
}
static char const *find_list_strncmp(char const **list, char const *key, size_t len)
{
for (; list && *list; ++list) {
char const *elem = *list;
if (elem && strncmp(elem, key, len) == 0) {
return elem;
}
}
return NULL;
}
#define MAX_JSON_TOKENS 128
static data_t *append_filtered_json(data_t *data, char const *json, char const **includes)
{
jsmn_parser parser = {0};
jsmn_init(&parser);
jsmntok_t tok[MAX_JSON_TOKENS] = {0};
int toks = jsmn_parse(&parser, json, strlen(json), tok, MAX_JSON_TOKENS);
if (toks < 1 || tok[0].type != JSMN_OBJECT) {
fprintf(stderr, "invalid json (%d): %s\n", toks, json);
return data; // invalid json
}
// check all tokens
char buf[1024] = {0};
for (int i = 1; i < toks - 1; ++i) {
jsmntok_t *k = tok + i;
jsmntok_t *v = tok + i + 1;
i += k->size + v->size;
//fprintf(stderr, "TOK (%d %d) %.*s : (%d %d) %.*s\n",
// k->type, k->size, k->end - k->start, json + k->start,
// v->type, v->size, v->end - v->start, json + v->start);
// check all includes
char const *key = find_list_strncmp(includes, json + k->start, k->end - k->start);
if (key) {
// append json tag
strncpy(buf, json + v->start, v->end - v->start);
data = data_append(data,
key, "", DATA_STRING, buf,
NULL);
}
}
return data;
}
data_t *data_tag_apply(data_tag_t *tag, data_t *data, char const *filename)
{
char const *val = tag->val;
if (filename && !strcmp("PATH", tag->val)) {
if (tag->gpsd_client) {
val = tag->gpsd_client->msg;
if (tag->includes) {
if (tag->key) {
// wrap tag includes
data_t *obj = append_filtered_json(NULL, val, tag->includes);
// append tag wrapper
data = data_append(data,
tag->key, "", DATA_DATA, obj,
NULL);
}
else {
// append tag includes
data = append_filtered_json(data, val, tag->includes);
}
}
else {
// append tag string
data = data_append(data,
tag->key, "", DATA_STRING, val,
NULL);
}
return data;
}
else if (filename && !strcmp("PATH", tag->val)) {
val = filename;
}
else if (filename && !strcmp("FILE", tag->val)) {
val = file_basename(filename);
}
// prepend simple tags
data = data_prepend(data,
tag->key, "", DATA_STRING, val,
NULL);

View file

@ -306,6 +306,23 @@ char *asepc(char **stringp, char delim)
return p;
}
static char *achrb(char const *s, int c, int b)
{
for (; s && *s && *s != b; ++s)
if (*s == c) return (char *)s;
return NULL;
}
char *asepcb(char **stringp, char delim, char stop)
{
if (!stringp || !*stringp) return NULL;
char *s = achrb(*stringp, delim, stop);
if (s) *s++ = '\0';
char *p = *stringp;
*stringp = s;
return p;
}
char *getkwargs(char **s, char **key, char **val)
{
char *v = asepc(s, ',');

View file

@ -362,7 +362,7 @@ static void print_mqtt_data(data_output_t *output, data_t *data, char const *for
// "events" topic
if (mqtt->events) {
char message[1024]; // we expect the biggest strings to be around 500 bytes.
char message[2048]; // we expect the biggest strings to be around 500 bytes.
data_print_jsons(data, message, sizeof(message));
expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
mqtt_client_publish(mqtt->mqc, mqtt->topic, message);

View file

@ -342,7 +342,12 @@ char const **well_known_output_fields(r_cfg_t *cfg)
for (void **iter = cfg->data_tags.elems; iter && *iter; ++iter) {
data_tag_t *tag = *iter;
list_push(&field_list, tag->key);
if (tag->key) {
list_push(&field_list, (void *)tag->key);
}
else {
list_push_all(&field_list, (void **)tag->includes);
}
}
if (cfg->report_protocol)
@ -1027,5 +1032,5 @@ void add_infile(r_cfg_t *cfg, char *in_file)
void add_data_tag(struct r_cfg *cfg, char *param)
{
list_push(&cfg->data_tags, data_tag_create(param));
list_push(&cfg->data_tags, data_tag_create(param, get_mgr(cfg)));
}

View file

@ -134,7 +134,7 @@ static void usage(int exit_code)
" Append output to file with :<filename> (e.g. -F csv:log.csv), defaults to stdout.\n"
" Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n"
" [-M time[:<options>] | protocol | level | stats | bits | help] Add various meta data to each output.\n"
" [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n"
" [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n"
" [-C native | si | customary] Convert units in decoded output.\n"
" [-T <seconds>] Specify number of seconds to run, also 12:34 or 1h23m45s\n"
" [-E hop | quit] Hop/Quit after outputting successful event(s)\n"
@ -231,8 +231,19 @@ static void help_tags(void)
{
term_help_printf(
"\t\t= Data tags option =\n"
" [-K FILE | PATH | <tag> | <key>=<value>] Add an expanded token or fixed tag to every output line.\n"
"\tIf <tag> or <value> is \"FILE\" or \"PATH\" an expanded token will be added.\n");
" [-K FILE | PATH | <tag> | <key>=<tag>] Add an expanded token or fixed tag to every output line.\n"
"\tIf <tag> is \"FILE\" or \"PATH\" an expanded token will be added.\n"
"\tThe <tag> can also be a GPSd URL, e.g.\n"
"\t\t\"-K gpsd,lat,lon\" (report lat and lon keys from local gpsd)\n"
"\t\t\"-K loc=gpsd,lat,lon\" (report lat and lon in loc object)\n"
"\t\t\"-K gpsd\" (full json TPV report, in default \"gps\" object)\n"
"\t\t\"-K foo=gpsd://127.0.0.1:2947\" (with key and address)\n"
"\t\t\"-K bar=gpsd,nmea\" (NMEA deault GPGGA report)\n"
"\t\t\"-K rmc=gpsd,nmea,filter='$GPRMC'\" (NMEA GPRMC report)\n"
"\tAlso <tag> can be a generic tcp address, e.g.\n"
"\t\t\"-K foo=tcp:localhost:4000\" (read lines as TCP client)\n"
"\t\t\"-K bar=tcp://127.0.0.1:3000,init='subscribe tags\\r\\n'\"\n"
"\t\t\"-K baz=tcp://127.0.0.1:5000,filter='a prefix to match'\"\n");
exit(0);
}