0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-13 17:19:11 +00:00

Stream chart labels ()

* stream chart labels

* update stream protocol to 4

* only send CLABEL_COMMIT when there are labels

* mark host as UNUSED

* log error for stray CLABEL_COMMIT

* remove commented define
This commit is contained in:
Emmanuel Vasilakis 2021-10-25 11:32:22 +03:00 committed by GitHub
parent 6f54cd3116
commit 7e9a2cbb0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 86 additions and 3 deletions

View file

@ -152,6 +152,24 @@ PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE
return PARSER_RC_OK;
}
PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, LABEL_SOURCE source)
{
((PARSER_USER_OBJECT *) user)->chart_labels = add_label_to_list(((PARSER_USER_OBJECT *) user)->chart_labels, key, value, source);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, struct label *new_labels)
{
RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
if (unlikely(!st)) {
error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", host->hostname);
return PARSER_RC_OK;
}
rrdset_update_labels(st, new_labels);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels)
{
@ -560,6 +578,38 @@ PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_act
return PARSER_RC_OK;
}
PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
if (!words[1] || !words[2] || !words[3]) {
error("Ignoring malformed or empty CHART LABEL command.");
return PARSER_RC_OK;
}
if (plugins_action->clabel_action) {
PARSER_RC rc = plugins_action->clabel_action(user, words[1], words[2], strtol(words[3], NULL, 10));
return rc;
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(words);
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
debug(D_PLUGINSD, "requested to commit chart labels");
struct label *chart_labels = ((PARSER_USER_OBJECT *)user)->chart_labels;
((PARSER_USER_OBJECT *)user)->chart_labels = NULL;
if (plugins_action->clabel_commit_action) {
return plugins_action->clabel_commit_action(user, host, chart_labels);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(words);

View file

@ -14,6 +14,7 @@ typedef struct parser_user_object {
struct plugind *cd;
int trust_durations;
struct label *new_labels;
struct label *chart_labels;
size_t count;
int enabled;
uint8_t st_exists;
@ -35,6 +36,8 @@ extern PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, cha
long multiplier, long divisor, char *options, RRD_ALGORITHM algorithm_type);
extern PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE source);
extern PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels);
extern PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, struct label *new_labels);
extern PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, LABEL_SOURCE source);
#endif //NETDATA_PLUGINSD_PARSER_H

View file

@ -61,6 +61,8 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
rc += parser_add_keyword(parser, "CLABEL_COMMIT", pluginsd_clabel_commit);
rc += parser_add_keyword(parser, "CLABEL", pluginsd_clabel);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
rc += parser_add_keyword(parser, "SET", pluginsd_set);
}

View file

@ -31,6 +31,8 @@ typedef struct pluginsd_action {
PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value);
PARSER_RC (*label_action)(void *user, char *key, char *value, LABEL_SOURCE source);
PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, struct label *new_labels);
PARSER_RC (*clabel_action)(void *user, char *key, char *value, LABEL_SOURCE source);
PARSER_RC (*clabel_commit_action)(void *user, RRDHOST *host, struct label *new_labels);
PARSER_RC (*guid_action)(void *user, uuid_t *uuid);
PARSER_RC (*context_action)(void *user, uuid_t *uuid);
@ -110,5 +112,7 @@ extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *
extern PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action);
#endif

View file

@ -221,6 +221,8 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
parser->plugins_action->chart_action = &pluginsd_chart_action;
parser->plugins_action->set_action = &pluginsd_set_action;
parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
parser->plugins_action->clabel_action = &pluginsd_clabel_action;
user->parser = parser;

View file

@ -183,6 +183,24 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
return 0;
}
// chart labels
void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
struct label_index *labels_c = &st->state->labels;
if (labels_c) {
netdata_rwlock_rdlock(&host->labels.labels_rwlock);
struct label *lbl = labels_c->head;
while(lbl) {
buffer_sprintf(host->sender->build,
"CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
lbl = lbl->next;
}
if (labels_c->head)
buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
netdata_rwlock_unlock(&host->labels.labels_rwlock);
}
}
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
@ -224,6 +242,10 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, (st->module_name)?st->module_name:""
);
// send the chart labels
if (host->sender->version >= STREAM_VERSION_CLABELS)
rrdpush_send_clabels(host, st);
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {

View file

@ -10,10 +10,10 @@
#define CONNECTED_TO_SIZE 100
// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
#define VERSION_GAP_FILLING 4
#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4
#define STREAM_VERSION_CLAIM 3
#define STREAM_VERSION_CLABELS 4
#define VERSION_GAP_FILLING 5
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."