diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt index 96ba145a10..bad51367ce 100644 --- a/collectors/plugins.d/gperf-config.txt +++ b/collectors/plugins.d/gperf-config.txt @@ -12,47 +12,47 @@ PARSER_KEYWORD; # # Plugins Only Keywords # -FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 -DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 -EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 -HOST, 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4 -HOST_DEFINE, 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5 -HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6 -HOST_LABEL, 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7 +FLUSH, 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1 +DISABLE, 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2 +EXIT, 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3 +HOST, 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4 +HOST_DEFINE, 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5 +HOST_DEFINE_END, 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6 +HOST_LABEL, 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7 # # Common keywords # -BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 -CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9 -CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10 -CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11 -DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12 -END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 -FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14 -FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 -LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16 -OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17 -SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 -VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19 -DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 -DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 -DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 -DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 -REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 -DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 +BEGIN, 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8 +CHART, 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9 +CLABEL, 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10 +CLABEL_COMMIT, 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11 +DIMENSION, 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12 +END, 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13 +FUNCTION, 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14 +FUNCTION_RESULT_BEGIN, 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15 +LABEL, 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16 +OVERWRITE, 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17 +SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18 +VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19 +DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 +DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 +DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +DYNCFG_RESET, 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 +REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 +DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 # # Streaming only keywords # -CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 -BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 -SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 -END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 +CLAIMED_ID, 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26 +BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 +SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 +END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 # # Streaming Replication keywords # -CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 -RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 -RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 -REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 -RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 -RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35 +CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30 +RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 +RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 +RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 +RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h index 6e8bbc5f12..b327d8d6d3 100644 --- a/collectors/plugins.d/gperf-hashtable.h +++ b/collectors/plugins.d/gperf-hashtable.h @@ -83,79 +83,79 @@ static PARSER_KEYWORD gperf_keywords[] = { {(char*)0}, {(char*)0}, {(char*)0}, #line 30 "gperf-config.txt" - {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, + {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, #line 49 "gperf-config.txt" - {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, + {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, #line 56 "gperf-config.txt" - {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, + {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, #line 17 "gperf-config.txt" - {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, + {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, #line 16 "gperf-config.txt" - {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, + {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, #line 55 "gperf-config.txt" - {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, + {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, #line 29 "gperf-config.txt" - {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12}, + {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 12}, #line 42 "gperf-config.txt" - {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, + {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, {(char*)0}, #line 40 "gperf-config.txt" - {"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, + {"DYNCFG_RESET", 104, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, #line 37 "gperf-config.txt" - {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, + {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, #line 26 "gperf-config.txt" - {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9}, + {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 9}, #line 35 "gperf-config.txt" - {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, + {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, #line 48 "gperf-config.txt" - {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, + {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, #line 57 "gperf-config.txt" - {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, + {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, #line 41 "gperf-config.txt" - {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, + {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, #line 39 "gperf-config.txt" - {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, + {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, #line 58 "gperf-config.txt" - {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35}, + {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 35}, #line 18 "gperf-config.txt" - {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4}, + {"HOST", 71, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 4}, #line 38 "gperf-config.txt" - {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, + {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, #line 25 "gperf-config.txt" - {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, + {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, #line 47 "gperf-config.txt" - {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, + {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, #line 54 "gperf-config.txt" - {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, + {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, #line 27 "gperf-config.txt" - {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, + {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 10}, #line 21 "gperf-config.txt" - {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7}, + {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 7}, #line 19 "gperf-config.txt" - {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5}, + {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 5}, #line 53 "gperf-config.txt" - {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, + {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 30}, #line 46 "gperf-config.txt" - {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, + {"CLAIMED_ID", 61, PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 26}, #line 15 "gperf-config.txt" - {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, + {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, #line 20 "gperf-config.txt" - {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, + {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 6}, #line 28 "gperf-config.txt" - {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, + {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 11}, #line 31 "gperf-config.txt" - {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, + {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 14}, #line 34 "gperf-config.txt" - {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17}, + {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 17}, #line 33 "gperf-config.txt" - {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, + {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 16}, #line 36 "gperf-config.txt" - {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}, + {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING|PARSER_REP_METADATA, WORKER_PARSER_FIRST_JOB + 19}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, #line 32 "gperf-config.txt" - {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15} + {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15} }; PARSER_KEYWORD * diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 48ad5ce587..dc44904a95 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -2890,6 +2890,13 @@ void pluginsd_process_thread_cleanup(void *ptr) { rrd_collector_finished(); +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp) { + fclose(parser->user.stream_log_fp); + parser->user.stream_log_fp = NULL; + } +#endif + parser_destroy(parser); } diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 937877cc01..39f39637e4 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -30,6 +30,7 @@ typedef enum __attribute__ ((__packed__)) parser_input_type { typedef enum __attribute__ ((__packed__)) { PARSER_INIT_PLUGINSD = (1 << 1), PARSER_INIT_STREAMING = (1 << 2), + PARSER_REP_METADATA = (1 << 3), } PARSER_REPERTOIRE; struct parser; @@ -54,6 +55,11 @@ typedef struct parser_user_object { size_t data_collections_count; int enabled; +#ifdef NETDATA_LOG_STREAM_RECEIVE + FILE *stream_log_fp; + PARSER_REPERTOIRE stream_log_repertoire; +#endif + STREAM_CAPABILITIES capabilities; // receiver capabilities struct { @@ -154,6 +160,11 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co } static inline int parser_action(PARSER *parser, char *input) { +#ifdef NETDATA_LOG_STREAM_RECEIVE + static __thread char line[PLUGINSD_LINE_MAX + 1]; + strncpyz(line, input, sizeof(line) - 1); +#endif + parser->line++; if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { @@ -197,6 +208,12 @@ static inline int parser_action(PARSER *parser, char *input) { PARSER_KEYWORD *t = parser_find_keyword(parser, command); if(likely(t)) { worker_is_busy(t->worker_job_id); + +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp && t->repertoire & parser->user.stream_log_repertoire) + fprintf(parser->user.stream_log_fp, "%s", line); +#endif + rc = parser_execute(parser, t, words, num_words); // rc = (*t->func)(words, num_words, parser); worker_is_idle(); diff --git a/database/rrd.h b/database/rrd.h index d0a04795d4..81adf2e52f 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -901,9 +901,7 @@ static inline uint32_t rrdset_metadata_upstream_version(RRDSET *st) { return __atomic_load_n(&st->rrdpush.sender.sent_version, __ATOMIC_RELAXED); } -static inline void rrdset_metadata_updated(RRDSET *st) { - __atomic_add_fetch(&st->version, 1, __ATOMIC_RELAXED); -} +void rrdset_metadata_updated(RRDSET *st); static inline void rrdset_metadata_exposed_upstream(RRDSET *st, uint32_t version) { __atomic_store_n(&st->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED); @@ -922,9 +920,7 @@ static inline uint32_t rrddim_metadata_upstream_version(RRDDIM *rd) { return __atomic_load_n(&rd->rrdpush.sender.sent_version, __ATOMIC_RELAXED); } -static inline void rrddim_metadata_updated(RRDDIM *rd) { - rrdset_metadata_updated(rd->rrdset); -} +void rrddim_metadata_updated(RRDDIM *rd); static inline void rrddim_metadata_exposed_upstream(RRDDIM *rd, uint32_t version) { __atomic_store_n(&rd->rrdpush.sender.sent_version, version, __ATOMIC_RELAXED); diff --git a/database/rrddim.c b/database/rrddim.c index d87cea6364..46226a548a 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -4,6 +4,11 @@ #include "rrd.h" #include "storage_engine.h" +void rrddim_metadata_updated(RRDDIM *rd) { + rrdcontext_updated_rrddim(rd); + rrdset_metadata_updated(rd->rrdset); +} + // ---------------------------------------------------------------------------- // RRDDIM index @@ -157,7 +162,6 @@ static void rrddim_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v // let the chart resync rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); - rrdset_metadata_updated(st); ml_dimension_new(rd); @@ -285,10 +289,9 @@ static void rrddim_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo if(ctr->react_action == RRDDIM_REACT_UPDATED) { // the chart needs to be updated to the parent rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); - rrdset_metadata_updated(st); } - rrdcontext_updated_rrddim(rd); + rrddim_metadata_updated(rd); } size_t rrddim_size(void) { diff --git a/database/rrdlabels.c b/database/rrdlabels.c index 7718b94946..e4820baed7 100644 --- a/database/rrdlabels.c +++ b/database/rrdlabels.c @@ -683,7 +683,7 @@ static RRDLABEL *rrdlabels_find_label_with_key_unsafe(RRDLABELS *labels, RRDLABE RRDLABEL *found = NULL; while ((PValue = JudyLFirstThenNext(labels->JudyL, &Index, &first_then_next))) { RRDLABEL *lb = (RRDLABEL *)Index; - if (lb->index.key == label->index.key) { + if (lb->index.key == label->index.key && lb != label) { found = (RRDLABEL *)Index; break; } @@ -700,13 +700,7 @@ static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, con spinlock_lock(&labels->spinlock); - RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(labels, new_label); - - if (old_label_with_key == new_label) { - spinlock_unlock(&labels->spinlock); - delete_label(new_label); - return; - } + RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)); size_t mem_before_judyl = JudyLMemUsed(labels->JudyL); @@ -714,24 +708,27 @@ static void labels_add_already_sanitized(RRDLABELS *labels, const char *key, con if (!PValue || PValue == PJERR) fatal("RRDLABELS: corrupted labels JudyL array"); - RRDLABEL_SRC new_ls = (ls & ~(RRDLABEL_FLAG_NEW | RRDLABEL_FLAG_OLD)); - labels->version++; - - if (old_label_with_key) { - (void)JudyLDel(&labels->JudyL, (Word_t)old_label_with_key, PJE0); + if(*PValue) { new_ls |= RRDLABEL_FLAG_OLD; - } else + delete_label(new_label); + } + else { new_ls |= RRDLABEL_FLAG_NEW; + RRDLABEL *old_label_with_same_key = rrdlabels_find_label_with_key_unsafe(labels, new_label); + if (old_label_with_same_key) { + (void) JudyLDel(&labels->JudyL, (Word_t) old_label_with_same_key, PJE0); + delete_label(old_label_with_same_key); + } + } + + labels->version++; *((RRDLABEL_SRC *)PValue) = new_ls; size_t mem_after_judyl = JudyLMemUsed(labels->JudyL); STATS_PLUS_MEMORY(&dictionary_stats_category_rrdlabels, 0, mem_after_judyl - mem_before_judyl, 0); spinlock_unlock(&labels->spinlock); - - if (old_label_with_key) - delete_label((RRDLABEL *)old_label_with_key); } void rrdlabels_add(RRDLABELS *labels, const char *name, const char *value, RRDLABEL_SRC ls) @@ -1043,7 +1040,7 @@ void rrdlabels_copy(RRDLABELS *dst, RRDLABELS *src) lfe_start_nolock(src, label, ls) { RRDLABEL *old_label_with_key = rrdlabels_find_label_with_key_unsafe(dst, label); - if (old_label_with_key && old_label_with_key == label) + if (old_label_with_key) continue; Pvoid_t *PValue = JudyLIns(&dst->JudyL, (Word_t)label, PJE0); diff --git a/database/rrdset.c b/database/rrdset.c index 3b637db5c1..8ed1389758 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -5,6 +5,12 @@ #include <sched.h> #include "storage_engine.h" + +void rrdset_metadata_updated(RRDSET *st) { + __atomic_add_fetch(&st->version, 1, __ATOMIC_RELAXED); + rrdcontext_updated_rrdset(st); +} + // ---------------------------------------------------------------------------- // RRDSET rrdpush send chart_slots @@ -484,7 +490,6 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, rrdset_update_permanent_labels(st); rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); - rrdset_metadata_updated(st); return ctr->react_action != RRDSET_REACT_NONE; } @@ -510,10 +515,9 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo } rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - rrdset_metadata_updated(st); } - rrdcontext_updated_rrdset(st); + rrdset_metadata_updated(st); } void rrdset_index_init(RRDHOST *host) { diff --git a/streaming/receiver.c b/streaming/receiver.c index 02b2e83a0c..5e6453e4aa 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -353,6 +353,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i buffered_reader_init(&rpt->reader); +#ifdef NETDATA_LOG_STREAM_RECEIVE + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(rpt->host) : "unknown"); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } +#endif + BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); while(!receiver_should_stop(rpt)) { diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 0899ae3025..ee00a1f781 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -374,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); // we will include it in the next iteration - rrdset_metadata_updated(st); + rrddim_metadata_updated(rd); } } rrddim_foreach_done(rd); diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 4eab0ccea4..14e0d31ba5 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -215,6 +215,10 @@ struct sender_state { struct compressor_state compressor; +#ifdef NETDATA_LOG_STREAM_SENDER + FILE *stream_log_fp; +#endif + #ifdef ENABLE_HTTPS NETDATA_SSL ssl; // structure used to encrypt the connection #endif diff --git a/streaming/sender.c b/streaming/sender.c index 1e89a57a73..8103df4f00 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -90,15 +90,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) sender_lock(s); -// if(s->host == localhost && type == STREAM_TRAFFIC_TYPE_METADATA) { -// FILE *fp = fopen("/tmp/stream.txt", "a"); -// fprintf(fp, "\n--- SEND MESSAGE START: %s ----\n" -// "%s" -// "--- SEND MESSAGE END ----------------------------------------\n" -// , rrdhost_hostname(s->host), src -// ); -// fclose(fp); -// } +#ifdef NETDATA_LOG_STREAM_SENDER + if(type == STREAM_TRAFFIC_TYPE_METADATA) { + if(!s->stream_log_fp) { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown"); + + s->stream_log_fp = fopen(filename, "w"); + } + + fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n" + "%s" + "--- SEND MESSAGE END ----------------------------------------\n" + , rrdhost_hostname(s->host), src + ); + } +#endif if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) { netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.", @@ -255,12 +262,13 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); st->rrdpush.sender.resync_time_s = 0; - rrdset_metadata_updated(st); RRDDIM *rd; rrddim_foreach_read(rd, st) rrddim_metadata_exposed_upstream_clear(rd); rrddim_foreach_done(rd); + + rrdset_metadata_updated(st); } rrdset_foreach_done(st); @@ -1235,6 +1243,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); + +#ifdef NETDATA_LOG_STREAM_SENDER + if(host->sender->stream_log_fp) { + fclose(host->sender->stream_log_fp); + host->sender->stream_log_fp = NULL; + } +#endif + sender_unlock(host->sender); freez(s->pipe_buffer);