diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 2d0788d804..c0dcedb67e 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -152,6 +152,24 @@ PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE return PARSER_RC_OK; } +PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, LABEL_SOURCE source) +{ + ((PARSER_USER_OBJECT *) user)->chart_labels = add_label_to_list(((PARSER_USER_OBJECT *) user)->chart_labels, key, value, source); + + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, struct label *new_labels) +{ + RRDSET *st = ((PARSER_USER_OBJECT *)user)->st; + if (unlikely(!st)) { + error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", host->hostname); + return PARSER_RC_OK; + } + + rrdset_update_labels(st, new_labels); + return PARSER_RC_OK; +} PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels) { @@ -560,6 +578,38 @@ PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_act return PARSER_RC_OK; } +PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action) +{ + if (!words[1] || !words[2] || !words[3]) { + error("Ignoring malformed or empty CHART LABEL command."); + return PARSER_RC_OK; + } + + if (plugins_action->clabel_action) { + PARSER_RC rc = plugins_action->clabel_action(user, words[1], words[2], strtol(words[3], NULL, 10)); + return rc; + } + + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action) +{ + UNUSED(words); + + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + debug(D_PLUGINSD, "requested to commit chart labels"); + + struct label *chart_labels = ((PARSER_USER_OBJECT *)user)->chart_labels; + ((PARSER_USER_OBJECT *)user)->chart_labels = NULL; + + if (plugins_action->clabel_commit_action) { + return plugins_action->clabel_commit_action(user, host, chart_labels); + } + + return PARSER_RC_OK; +} + PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action) { UNUSED(words); diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 4b94c53aca..fb4a45b7a7 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -14,6 +14,7 @@ typedef struct parser_user_object { struct plugind *cd; int trust_durations; struct label *new_labels; + struct label *chart_labels; size_t count; int enabled; uint8_t st_exists; @@ -35,6 +36,8 @@ extern PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, cha long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type); extern PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE source); extern PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels); +extern PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, struct label *new_labels); +extern PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, LABEL_SOURCE source); #endif //NETDATA_PLUGINSD_PARSER_H diff --git a/parser/parser.c b/parser/parser.c index 21d7fb3fc1..5fc601cea0 100644 --- a/parser/parser.c +++ b/parser/parser.c @@ -61,6 +61,8 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + rc += parser_add_keyword(parser, "CLABEL_COMMIT", pluginsd_clabel_commit); + rc += parser_add_keyword(parser, "CLABEL", pluginsd_clabel); rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); rc += parser_add_keyword(parser, "SET", pluginsd_set); } diff --git a/parser/parser.h b/parser/parser.h index 0b78775e1e..8d11a90074 100644 --- a/parser/parser.h +++ b/parser/parser.h @@ -31,6 +31,8 @@ typedef struct pluginsd_action { PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value); PARSER_RC (*label_action)(void *user, char *key, char *value, LABEL_SOURCE source); PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, struct label *new_labels); + PARSER_RC (*clabel_action)(void *user, char *key, char *value, LABEL_SOURCE source); + PARSER_RC (*clabel_commit_action)(void *user, RRDHOST *host, struct label *new_labels); PARSER_RC (*guid_action)(void *user, uuid_t *uuid); PARSER_RC (*context_action)(void *user, uuid_t *uuid); @@ -110,5 +112,7 @@ extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION * extern PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action); extern PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action); extern PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action); +extern PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action); +extern PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action); #endif diff --git a/streaming/receiver.c b/streaming/receiver.c index 6771387920..bb7dca77c1 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -221,6 +221,8 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; parser->plugins_action->chart_action = &pluginsd_chart_action; parser->plugins_action->set_action = &pluginsd_set_action; + parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; + parser->plugins_action->clabel_action = &pluginsd_clabel_action; user->parser = parser; diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 038383b55b..53a8976999 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -183,6 +183,24 @@ static inline int need_to_send_chart_definition(RRDSET *st) { return 0; } +// chart labels +void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { + struct label_index *labels_c = &st->state->labels; + if (labels_c) { + netdata_rwlock_rdlock(&host->labels.labels_rwlock); + struct label *lbl = labels_c->head; + while(lbl) { + buffer_sprintf(host->sender->build, + "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source); + + lbl = lbl->next; + } + if (labels_c->head) + buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); + netdata_rwlock_unlock(&host->labels.labels_rwlock); + } +} + // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { @@ -224,6 +242,10 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { , (st->module_name)?st->module_name:"" ); + // send the chart labels + if (host->sender->version >= STREAM_VERSION_CLABELS) + rrdpush_send_clabels(host, st); + // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 7e07c8916f..027ccd102a 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,10 +10,10 @@ #define CONNECTED_TO_SIZE 100 -// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3 -#define VERSION_GAP_FILLING 4 +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 #define STREAM_VERSION_CLAIM 3 +#define STREAM_VERSION_CLABELS 4 +#define VERSION_GAP_FILLING 5 #define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..."