0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-22 20:42:33 +00:00

fix missing labels from parents ()

* maintain in /tmp/stream-receiver-X.txt a copy of metadata received

* stream log metadata to /tmp/stream-sender-localhost.txt

* log the stream of all senders

* cleanup use of X_update_metadata() functions

* fix for last commit

* rrdlabel unmark/mark/delete unmarked restored
This commit is contained in:
Costa Tsaousis 2023-10-28 23:32:47 +01:00 committed by GitHub
parent 48af9dc2e0
commit 532e9b3d8d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 164 additions and 111 deletions

View file

@ -12,47 +12,47 @@ PARSER_KEYWORD;
#
# Plugins Only Keywords
#
FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4
HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5
HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6
HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7
FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1
DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2
EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3
HOST, 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4
HOST_DEFINE, 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5
HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6
HOST_LABEL, 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7
#
# Common keywords
#
BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9
CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10
CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11
DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12
END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14
FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16
OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17
SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18
VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19
DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8
CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9
CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10
CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11
DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12
END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13
FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14
FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15
LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16
OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17
SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18
VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19
DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
#
# Streaming only keywords
#
CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26
BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
CLAIMED_ID, 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26
BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
#
# Streaming Replication keywords
#
CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30
RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35
CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30
RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35

View file

@ -83,79 +83,79 @@ static PARSER_KEYWORD gperf_keywords[] =
{
{(char*)0}, {(char*)0}, {(char*)0},
#line 30 "gperf-config.txt"
{"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
{"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
#line 49 "gperf-config.txt"
{"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
{"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
#line 56 "gperf-config.txt"
{"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
{"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
#line 17 "gperf-config.txt"
{"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
{"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
#line 16 "gperf-config.txt"
{"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
{"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
#line 55 "gperf-config.txt"
{"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
{"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
#line 29 "gperf-config.txt"
{"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12},
{"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12},
#line 42 "gperf-config.txt"
{"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
{"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
{(char*)0},
#line 40 "gperf-config.txt"
{"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
{"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
#line 37 "gperf-config.txt"
{"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
{"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
#line 26 "gperf-config.txt"
{"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9},
{"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9},
#line 35 "gperf-config.txt"
{"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
{"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
#line 48 "gperf-config.txt"
{"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
{"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
#line 57 "gperf-config.txt"
{"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
{"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
#line 41 "gperf-config.txt"
{"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
{"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
#line 39 "gperf-config.txt"
{"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
{"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
#line 58 "gperf-config.txt"
{"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35},
{"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35},
#line 18 "gperf-config.txt"
{"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4},
{"HOST", 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4},
#line 38 "gperf-config.txt"
{"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
{"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
#line 25 "gperf-config.txt"
{"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
{"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
#line 47 "gperf-config.txt"
{"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
{"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
#line 54 "gperf-config.txt"
{"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
{"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
#line 27 "gperf-config.txt"
{"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10},
{"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10},
#line 21 "gperf-config.txt"
{"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7},
{"HOST_LABEL", 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7},
#line 19 "gperf-config.txt"
{"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5},
{"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5},
#line 53 "gperf-config.txt"
{"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30},
{"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30},
#line 46 "gperf-config.txt"
{"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26},
{"CLAIMED_ID", 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26},
#line 15 "gperf-config.txt"
{"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
{"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
#line 20 "gperf-config.txt"
{"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6},
{"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6},
#line 28 "gperf-config.txt"
{"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11},
{"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11},
#line 31 "gperf-config.txt"
{"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14},
{"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14},
#line 34 "gperf-config.txt"
{"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17},
{"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17},
#line 33 "gperf-config.txt"
{"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
{"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16},
#line 36 "gperf-config.txt"
{"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19},
{"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19},
{(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
{(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
{(char*)0},
#line 32 "gperf-config.txt"
{"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}
{"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}
};
PARSER_KEYWORD *

View file

@ -2890,6 +2890,13 @@ void pluginsd_process_thread_cleanup(void *ptr) {
rrd_collector_finished();
#ifdef NETDATA_LOG_STREAM_RECEIVE
if(parser->user.stream_log_fp) {
fclose(parser->user.stream_log_fp);
parser->user.stream_log_fp = NULL;
}
#endif
parser_destroy(parser);
}

View file

@ -30,6 +30,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type {
typedef enum __attribute__ ((__packed__)) {
PARSER_INIT_PLUGINSD = (1 << 1),
PARSER_INIT_STREAMING = (1 << 2),
PARSER_REP_METADATA = (1 << 3),
} PARSER_REPERTOIRE;
struct parser;
@ -54,6 +55,11 @@ typedef struct parser_user_object {
size_t data_collections_count;
int enabled;
#ifdef NETDATA_LOG_STREAM_RECEIVE
FILE *stream_log_fp;
PARSER_REPERTOIRE stream_log_repertoire;
#endif
STREAM_CAPABILITIES capabilities; // receiver capabilities
struct {
@ -154,6 +160,11 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co
}
static inline int parser_action(PARSER *parser, char *input) {
#ifdef NETDATA_LOG_STREAM_RECEIVE
static __thread char line[PLUGINSD_LINE_MAX + 1];
strncpyz(line, input, sizeof(line) - 1);
#endif
parser->line++;
if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
@ -197,6 +208,12 @@ static inline int parser_action(PARSER *parser, char *input) {
PARSER_KEYWORD *t = parser_find_keyword(parser, command);
if(likely(t)) {
worker_is_busy(t->worker_job_id);
#ifdef NETDATA_LOG_STREAM_RECEIVE
if(parser->user.stream_log_fp && t->repertoire & parser->user.stream_log_repertoire)
fprintf(parser->user.stream_log_fp, "%s", line);
#endif
rc = parser_execute(parser, t, words, num_words);
// rc = (*t->func)(words, num_words, parser);
worker_is_idle();

View file

@ -901,9 +901,7 @@ static inline uint32_t rrdset_metadata_upstream_version(RRDSET *st) {
return __atomic_load_n(&st->rrdpush.sender.sent_version, __ATOMIC_RELAXED);
}
static inline void rrdset_metadata_updated(RRDSET *st) {
__atomic_add_fetch(&st->version, 1, __ATOMIC_RELAXED);
}
void rrdset_metadata_updated(RRDSET *st);
static inline void rrdset_metadata_exposed_upstream(RRDSET *st, uint32_t version) {
__atomic_store_n(&st->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED);
@ -922,9 +920,7 @@ static inline uint32_t rrddim_metadata_upstream_version(RRDDIM *rd) {
return __atomic_load_n(&rd->rrdpush.sender.sent_version, __ATOMIC_RELAXED);
}
static inline void rrddim_metadata_updated(RRDDIM *rd) {
rrdset_metadata_updated(rd->rrdset);
}
void rrddim_metadata_updated(RRDDIM *rd);
static inline void rrddim_metadata_exposed_upstream(RRDDIM *rd, uint32_t version) {
__atomic_store_n(&rd->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED);

View file

@ -4,6 +4,11 @@
#include "rrd.h"
#include "storage_engine.h"
void rrddim_metadata_updated(RRDDIM *rd) {
rrdcontext_updated_rrddim(rd);
rrdset_metadata_updated(rd->rrdset);
}
// ----------------------------------------------------------------------------
// RRDDIM index
@ -157,7 +162,6 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
// let the chart resync
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_metadata_updated(st);
ml_dimension_new(rd);
@ -285,10 +289,9 @@ static void rrddim_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
if(ctr->react_action == RRDDIM_REACT_UPDATED) {
// the chart needs to be updated to the parent
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_metadata_updated(st);
}
rrdcontext_updated_rrddim(rd);
rrddim_metadata_updated(rd);
}
size_t rrddim_size(void) {

View file

@ -683,7 +683,7 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE
RRDLABEL *found = NULL;
while ((PValue = JudyLFirstThenNext(labels->JudyL, &Index, &first_then_next))) {
RRDLABEL *lb = (RRDLABEL *)Index;
if (lb->index.key == label->index.key) {
if (lb->index.key == label->index.key && lb != label) {
found = (RRDLABEL *)Index;
break;
}
@ -700,13 +700,7 @@ static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, con
spinlock_lock(&labels->spinlock);
RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(labels, new_label);
if (old_label_with_key == new_label) {
spinlock_unlock(&labels->spinlock);
delete_label(new_label);
return;
}
RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD));
size_t mem_before_judyl = JudyLMemUsed(labels->JudyL);
@ -714,24 +708,27 @@ static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, con
if (!PValue || PValue == PJERR)
fatal("RRDLABELS: corrupted labels JudyL array");
RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD));
labels->version++;
if (old_label_with_key) {
(void)JudyLDel(&labels->JudyL, (Word_t)old_label_with_key, PJE0);
if(*PValue) {
new_ls |= RRDLABEL_FLAG_OLD;
} else
delete_label(new_label);
}
else {
new_ls |= RRDLABEL_FLAG_NEW;
RRDLABEL *old_label_with_same_key = rrdlabels_find_label_with_key_unsafe(labels, new_label);
if (old_label_with_same_key) {
(void) JudyLDel(&labels->JudyL, (Word_t) old_label_with_same_key, PJE0);
delete_label(old_label_with_same_key);
}
}
labels->version++;
*((RRDLABEL_SRC *)PValue) = new_ls;
size_t mem_after_judyl = JudyLMemUsed(labels->JudyL);
STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0);
spinlock_unlock(&labels->spinlock);
if (old_label_with_key)
delete_label((RRDLABEL *)old_label_with_key);
}
void rrdlabels_add(RRDLABELS *labels, const char *name, const char *value, RRDLABEL_SRC ls)
@ -1043,7 +1040,7 @@ void rrdlabels_copy(RRDLABELS *dst, RRDLABELS *src)
lfe_start_nolock(src, label, ls)
{
RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(dst, label);
if (old_label_with_key && old_label_with_key == label)
if (old_label_with_key)
continue;
Pvoid_t *PValue = JudyLIns(&dst->JudyL, (Word_t)label, PJE0);

View file

@ -5,6 +5,12 @@
#include <sched.h>
#include "storage_engine.h"
void rrdset_metadata_updated(RRDSET *st) {
__atomic_add_fetch(&st->version, 1, __ATOMIC_RELAXED);
rrdcontext_updated_rrdset(st);
}
// ----------------------------------------------------------------------------
// RRDSET rrdpush send chart_slots
@ -484,7 +490,6 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
rrdset_update_permanent_labels(st);
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_metadata_updated(st);
return ctr->react_action != RRDSET_REACT_NONE;
}
@ -510,10 +515,9 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
}
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
rrdset_metadata_updated(st);
}
rrdcontext_updated_rrdset(st);
rrdset_metadata_updated(st);
}
void rrdset_index_init(RRDHOST *host) {

View file

@ -353,6 +353,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
buffered_reader_init(&rpt->reader);
#ifdef NETDATA_LOG_STREAM_RECEIVE
{
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(rpt->host) : "unknown");
parser->user.stream_log_fp = fopen(filename, "w");
parser->user.stream_log_repertoire = PARSER_REP_METADATA;
}
#endif
BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
while(!receiver_should_stop(rpt)) {

View file

@ -374,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
// we will include it in the next iteration
rrdset_metadata_updated(st);
rrddim_metadata_updated(rd);
}
}
rrddim_foreach_done(rd);

View file

@ -215,6 +215,10 @@ struct sender_state {
struct compressor_state compressor;
#ifdef NETDATA_LOG_STREAM_SENDER
FILE *stream_log_fp;
#endif
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
#endif

View file

@ -90,15 +90,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
sender_lock(s);
// if(s->host == localhost && type == STREAM_TRAFFIC_TYPE_METADATA) {
// FILE *fp = fopen("/tmp/stream.txt", "a");
// fprintf(fp, "\n--- SEND MESSAGE START: %s ----\n"
// "%s"
// "--- SEND MESSAGE END ----------------------------------------\n"
// , rrdhost_hostname(s->host), src
// );
// fclose(fp);
// }
#ifdef NETDATA_LOG_STREAM_SENDER
if(type == STREAM_TRAFFIC_TYPE_METADATA) {
if(!s->stream_log_fp) {
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown");
s->stream_log_fp = fopen(filename, "w");
}
fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n"
"%s"
"--- SEND MESSAGE END ----------------------------------------\n"
, rrdhost_hostname(s->host), src
);
}
#endif
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
@ -255,12 +262,13 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->rrdpush.sender.resync_time_s = 0;
rrdset_metadata_updated(st);
RRDDIM *rd;
rrddim_foreach_read(rd, st)
rrddim_metadata_exposed_upstream_clear(rd);
rrddim_foreach_done(rd);
rrdset_metadata_updated(st);
}
rrdset_foreach_done(st);
@ -1235,6 +1243,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
#ifdef NETDATA_LOG_STREAM_SENDER
if(host->sender->stream_log_fp) {
fclose(host->sender->stream_log_fp);
host->sender->stream_log_fp = NULL;
}
#endif
sender_unlock(host->sender);
freez(s->pipe_buffer);