diff --git a/CMakeLists.txt b/CMakeLists.txt index a3d998b0e1..40b160f32f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1598,15 +1598,15 @@ set(RRD_PLUGIN_FILES src/database/contexts/api_v2_contexts_alerts.h src/database/contexts/api_v2_contexts_alert_transitions.c src/database/contexts/api_v2_contexts_alert_config.c - src/database/contexts/context.c - src/database/contexts/instance.c - src/database/contexts/internal.h - src/database/contexts/metric.c + src/database/contexts/rrdcontext-context.c + src/database/contexts/rrdcontext-instance.c + src/database/contexts/rrdcontext-internal.h + src/database/contexts/rrdcontext-metric.c src/database/contexts/query_scope.c src/database/contexts/query_target.c src/database/contexts/rrdcontext.c src/database/contexts/rrdcontext.h - src/database/contexts/worker.c + src/database/contexts/rrdcontext-worker.c src/database/rrdcollector.c src/database/rrdcollector.h src/database/rrddim.c @@ -1658,7 +1658,7 @@ set(RRD_PLUGIN_FILES src/database/rrd-database-mode.c src/database/rrdhost-system-info.c src/database/rrdhost-system-info.h - src/database/contexts/contexts-loading.c + src/database/contexts/rrdcontext-loading.c src/database/rrdset-index-id.c src/database/rrdset-index-id.h src/database/rrdset-index-name.c @@ -1688,6 +1688,7 @@ set(RRD_PLUGIN_FILES src/database/rrdhost-collection.h src/database/pattern-array.c src/database/pattern-array.h + src/database/contexts/rrdcontext-queues.c ) if(ENABLE_DBENGINE) @@ -2134,7 +2135,7 @@ set(ACLK_FILES src/aclk/schema-wrappers/context_stream.cc src/aclk/schema-wrappers/context_stream.h src/aclk/schema-wrappers/context.cc - src/aclk/schema-wrappers/context.h + src/aclk/schema-wrappers/rrdcontext-context.h src/aclk/schema-wrappers/schema_wrappers.h src/aclk/schema-wrappers/schema_wrapper_utils.cc src/aclk/schema-wrappers/schema_wrapper_utils.h diff --git a/src/aclk/schema-wrappers/context.cc b/src/aclk/schema-wrappers/context.cc index b04c9d20cc..6a73a8268c 100644 --- a/src/aclk/schema-wrappers/context.cc +++ b/src/aclk/schema-wrappers/context.cc @@ -6,7 +6,7 @@ #include "schema_wrapper_utils.h" -#include "context.h" +#include "rrdcontext-context.h" using namespace context::v1; diff --git a/src/aclk/schema-wrappers/context.h b/src/aclk/schema-wrappers/rrdcontext-context.h similarity index 100% rename from src/aclk/schema-wrappers/context.h rename to src/aclk/schema-wrappers/rrdcontext-context.h diff --git a/src/aclk/schema-wrappers/schema_wrappers.h b/src/aclk/schema-wrappers/schema_wrappers.h index b651b88457..e698bae4d9 100644 --- a/src/aclk/schema-wrappers/schema_wrappers.h +++ b/src/aclk/schema-wrappers/schema_wrappers.h @@ -13,7 +13,7 @@ #include "node_info.h" #include "capability.h" #include "context_stream.h" -#include "context.h" +#include "rrdcontext-context.h" #include "agent_cmds.h" #endif /* SCHEMA_WRAPPERS_H */ diff --git a/src/database/contexts/api_v1_contexts.c b/src/database/contexts/api_v1_contexts.c index 1d53a68792..bcd1d1f234 100644 --- a/src/database/contexts/api_v1_contexts.c +++ b/src/database/contexts/api_v1_contexts.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" static void rrd_flags_to_buffer_json_array_items(RRD_FLAGS flags, BUFFER *wb) { if(flags & RRD_FLAG_QUEUED_FOR_HUB) diff --git a/src/database/contexts/api_v2_contexts.h b/src/database/contexts/api_v2_contexts.h index 3fb5354b9e..5a5b72d1be 100644 --- a/src/database/contexts/api_v2_contexts.h +++ b/src/database/contexts/api_v2_contexts.h @@ -3,7 +3,7 @@ #ifndef NETDATA_API_V2_CONTEXTS_H #define NETDATA_API_V2_CONTEXTS_H -#include "internal.h" +#include "rrdcontext-internal.h" typedef enum __attribute__ ((__packed__)) { FTS_MATCHED_NONE = 0, diff --git a/src/database/contexts/api_v2_contexts_alerts.h b/src/database/contexts/api_v2_contexts_alerts.h index b7be3f4d93..f7ec1c8922 100644 --- a/src/database/contexts/api_v2_contexts_alerts.h +++ b/src/database/contexts/api_v2_contexts_alerts.h @@ -3,7 +3,7 @@ #ifndef NETDATA_API_V2_CONTEXTS_ALERTS_H #define NETDATA_API_V2_CONTEXTS_ALERTS_H -#include "internal.h" +#include "rrdcontext-internal.h" #include "api_v2_contexts.h" struct alert_transitions_callback_data { diff --git a/src/database/contexts/query_scope.c b/src/database/contexts/query_scope.c index 7485ef3e6a..559867f4b0 100644 --- a/src/database/contexts/query_scope.c +++ b/src/database/contexts/query_scope.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" ssize_t query_scope_foreach_host(SIMPLE_PATTERN *scope_hosts_sp, SIMPLE_PATTERN *hosts_sp, foreach_host_cb_t cb, void *data, @@ -50,7 +50,7 @@ ssize_t query_scope_foreach_host(SIMPLE_PATTERN *scope_hosts_sp, SIMPLE_PATTERN bool queryable_host = (match == SP_MATCHED_POSITIVE); v_hash += dictionary_version(host->rrdctx.contexts); - h_hash += dictionary_version(host->rrdctx.hub_queue); + h_hash += rrdcontext_queue_version(&host->rrdctx.hub_queue); a_hash += dictionary_version(host->rrdcalc_root_index); t_hash += __atomic_load_n(&host->health_transitions, __ATOMIC_RELAXED); ssize_t ret = cb(data, host, queryable_host); diff --git a/src/database/contexts/query_target.c b/src/database/contexts/query_target.c index 5df93085e6..4e557b6396 100644 --- a/src/database/contexts/query_target.c +++ b/src/database/contexts/query_target.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" #define QUERY_TARGET_MAX_REALLOC_INCREASE 500 #define query_target_realloc_size(size, start) \ @@ -1128,7 +1128,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { // single host query qt->versions.contexts_hard_hash = dictionary_version(host->rrdctx.contexts); - qt->versions.contexts_soft_hash = dictionary_version(host->rrdctx.hub_queue); + qt->versions.contexts_soft_hash = rrdcontext_queue_version(&host->rrdctx.hub_queue); qt->versions.alerts_hard_hash = dictionary_version(host->rrdcalc_root_index); qt->versions.alerts_soft_hash = __atomic_load_n(&host->health_transitions, __ATOMIC_RELAXED); query_node_add(&qtl, host, true); diff --git a/src/database/contexts/context.c b/src/database/contexts/rrdcontext-context.c similarity index 67% rename from src/database/contexts/context.c rename to src/database/contexts/rrdcontext-context.c index d11ccf6752..be126a4253 100644 --- a/src/database/contexts/context.c +++ b/src/database/contexts/rrdcontext-context.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) { RRDCONTEXT *rc = rrdcontext_acquired_value(rca); @@ -96,11 +96,8 @@ static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unuse // update the count of contexts __atomic_sub_fetch(&rc->rrdhost->rrdctx.contexts_count, 1, __ATOMIC_RELAXED); - if(rc->rrdhost->rrdctx.hub_queue) - dictionary_del(rc->rrdhost->rrdctx.hub_queue, string2str(rc->id)); - - if(rc->rrdhost->rrdctx.pp_queue) - dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id)); + rrdcontext_del_from_hub_queue(rc, false); + rrdcontext_del_from_pp_queue(rc, false); rrdinstances_destroy_from_rrdcontext(rc); rrdcontext_freez(rc); @@ -235,64 +232,6 @@ void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) { rrdcontext_queue_for_post_processing(rc, function, rc->flags); } -static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { - RRDCONTEXT *rc = context; - rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); - rc->queue.queued_ut = now_realtime_usec(); - rc->queue.queued_flags = rrd_flags_get(rc); -} - -static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { - RRDCONTEXT *rc = context; - rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB); -} - -static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { - // context and new_context are the same - // we just need to update the timings - RRDCONTEXT *rc = context; - rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); - rc->queue.queued_ut = now_realtime_usec(); - rc->queue.queued_flags |= rrd_flags_get(rc); - - return true; -} - -static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { - RRDCONTEXT *rc = context; - rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); - rc->pp.queued_flags = rc->flags; - rc->pp.queued_ut = now_realtime_usec(); -} - -static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) { - RRDCONTEXT *rc = context; - - // IMPORTANT: - // Do not rely on this flag being absent, because the dictionaries have delayed deletions (garbage collect) - // so, this flag may not be deleted immediately from the context. - rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP); - rc->pp.dequeued_ut = now_realtime_usec(); -} - -static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) { - RRDCONTEXT *rc = context; - bool changed = false; - - if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) { - rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); - changed = true; - } - - if(rc->pp.queued_flags != rc->flags) { - rc->pp.queued_flags |= rc->flags; - changed = true; - } - - return changed; -} - - void rrdhost_create_rrdcontexts(RRDHOST *host) { if(unlikely(!host)) return; if(likely(host->rrdctx.contexts)) return; @@ -306,42 +245,19 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) { dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host); dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host); - host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); - dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL); - dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL); - dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL); + memset(&host->rrdctx.pp_queue, 0, sizeof(host->rrdctx.pp_queue)); + RRDCONTEXT_QUEUE_INIT(&host->rrdctx.pp_queue); + spinlock_init(&host->rrdctx.pp_queue.spinlock); - host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0); - dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL); - dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL); - dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL); + memset(&host->rrdctx.hub_queue, 0, sizeof(host->rrdctx.hub_queue)); + RRDCONTEXT_QUEUE_INIT(&host->rrdctx.hub_queue); + spinlock_init(&host->rrdctx.hub_queue.spinlock); } void rrdhost_destroy_rrdcontexts(RRDHOST *host) { if(unlikely(!host)) return; if(unlikely(!host->rrdctx.contexts)) return; - if(host->rrdctx.hub_queue) { - RRDCONTEXT *rc; - dfe_start_write(host->rrdctx.hub_queue, rc) { - dictionary_del(host->rrdctx.hub_queue, string2str(rc->id)); - } - dfe_done(rc); - dictionary_destroy(host->rrdctx.hub_queue); - host->rrdctx.hub_queue = NULL; - } - - if(host->rrdctx.pp_queue) { - RRDCONTEXT *rc; - dfe_start_write(host->rrdctx.pp_queue, rc) { - dictionary_del(host->rrdctx.pp_queue, string2str(rc->id)); - } - dfe_done(rc); - dictionary_destroy(host->rrdctx.pp_queue); - host->rrdctx.pp_queue = NULL; - } - dictionary_destroy(host->rrdctx.contexts); host->rrdctx.contexts = NULL; } - diff --git a/src/database/contexts/instance.c b/src/database/contexts/rrdcontext-instance.c similarity index 99% rename from src/database/contexts/instance.c rename to src/database/contexts/rrdcontext-instance.c index 1ce99a66f6..64ab6e6491 100644 --- a/src/database/contexts/instance.c +++ b/src/database/contexts/rrdcontext-instance.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" // ---------------------------------------------------------------------------- // helper one-liners for RRDINSTANCE diff --git a/src/database/contexts/internal.h b/src/database/contexts/rrdcontext-internal.h similarity index 95% rename from src/database/contexts/internal.h rename to src/database/contexts/rrdcontext-internal.h index 5e799cfca2..53a47d3646 100644 --- a/src/database/contexts/internal.h +++ b/src/database/contexts/rrdcontext-internal.h @@ -5,7 +5,7 @@ #include "rrdcontext.h" #include "../sqlite/sqlite_context.h" -#include "../../aclk/schema-wrappers/context.h" +#include "../../aclk/schema-wrappers/rrdcontext-context.h" #include "../../aclk/aclk_contexts_api.h" #include "../../aclk/aclk.h" #include "../storage-engine.h" @@ -279,6 +279,7 @@ typedef struct rrdcontext { RRDHOST *rrdhost; struct { + Word_t idx; RRD_FLAGS queued_flags; // the last flags that triggered the post-processing size_t executions; // how many times this context has been processed usec_t queued_ut; // the last time this was queued @@ -286,6 +287,7 @@ typedef struct rrdcontext { } pp; struct { + Word_t idx; RRD_FLAGS queued_flags; // the last flags that triggered the queueing size_t dispatches; // the number of times this has been dispatched to hub usec_t queued_ut; // the last time this was queued @@ -295,6 +297,11 @@ typedef struct rrdcontext { } queue; } RRDCONTEXT; +void rrdcontext_add_to_pp_queue(RRDCONTEXT *rc); +void rrdcontext_add_to_hub_queue(RRDCONTEXT *rc); +void rrdcontext_del_from_hub_queue(RRDCONTEXT *rc, bool having_lock); +void rrdcontext_del_from_pp_queue(RRDCONTEXT *rc, bool having_lock); + // ---------------------------------------------------------------------------- // helpers for counting collected metrics, instances and contexts @@ -484,4 +491,15 @@ void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc); RRDLABELS *rrdinstance_labels(RRDINSTANCE *ri); +bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs); +void rrdcontext_post_process_queued_contexts(RRDHOST *host); +void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut); +usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut); +bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending); +bool rrdcontext_should_be_deleted(RRDCONTEXT *rc); +void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc); +static inline void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) { + rrdcontext_del_from_pp_queue(rc, false); +} + #endif //NETDATA_RRDCONTEXT_INTERNAL_H diff --git a/src/database/contexts/contexts-loading.c b/src/database/contexts/rrdcontext-loading.c similarity index 99% rename from src/database/contexts/contexts-loading.c rename to src/database/contexts/rrdcontext-loading.c index a6b5aeb286..1dbf110cca 100644 --- a/src/database/contexts/contexts-loading.c +++ b/src/database/contexts/rrdcontext-loading.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" static __thread size_t th_ignored_metrics = 0, th_ignored_instances = 0, th_zero_retention_metrics = 0; diff --git a/src/database/contexts/metric.c b/src/database/contexts/rrdcontext-metric.c similarity index 99% rename from src/database/contexts/metric.c rename to src/database/contexts/rrdcontext-metric.c index c417a2191e..a6851b487e 100644 --- a/src/database/contexts/metric.c +++ b/src/database/contexts/rrdcontext-metric.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function); diff --git a/src/database/contexts/rrdcontext-queues.c b/src/database/contexts/rrdcontext-queues.c new file mode 100644 index 0000000000..9c0172e313 --- /dev/null +++ b/src/database/contexts/rrdcontext-queues.c @@ -0,0 +1,272 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdcontext-internal.h" + +typedef enum { + RRDCONTEXT_QUEUE_INVALID = 0, + RRDCONTEXT_QUEUE_ADDED, + RRDCONTEXT_QUEUE_FOUND, +} RRDCONTEXT_QUEUE_STATUS; + +static inline RRDCONTEXT_QUEUE_STATUS rrdcontext_queue_add(RRDCONTEXT_QUEUE_JudyLSet *queue, RRDCONTEXT *rc, Word_t *idx, bool having_lock) { + RRDCONTEXT_QUEUE_STATUS ret = RRDCONTEXT_QUEUE_INVALID; + if(!queue || !rc || !idx) return ret; + + if(!having_lock) + spinlock_lock(&queue->spinlock); + + if(*idx) { + fatal_assert(RRDCONTEXT_QUEUE_GET(queue, *idx) == rc); + ret = RRDCONTEXT_QUEUE_FOUND; + } + else { + *idx = queue->id++; + RRDCONTEXT_QUEUE_SET(queue, *idx, rc); + __atomic_add_fetch(&queue->version, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&queue->entries, 1, __ATOMIC_RELAXED); + ret = RRDCONTEXT_QUEUE_ADDED; + } + + if(!having_lock) + spinlock_unlock(&queue->spinlock); + + return ret; +} + +void rrdcontext_add_to_hub_queue(RRDCONTEXT *rc) { + if(!rc || !rc->rrdhost) return; + + spinlock_lock(&rc->rrdhost->rrdctx.hub_queue.spinlock); + + RRDCONTEXT_QUEUE_STATUS ret = rrdcontext_queue_add(&rc->rrdhost->rrdctx.hub_queue, rc, &rc->queue.idx, true); + + if(ret == RRDCONTEXT_QUEUE_ADDED) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags = rrd_flags_get(rc); + } + else if(ret == RRDCONTEXT_QUEUE_FOUND) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB); + rc->queue.queued_ut = now_realtime_usec(); + rc->queue.queued_flags |= rrd_flags_get(rc); + } + + spinlock_unlock(&rc->rrdhost->rrdctx.hub_queue.spinlock); +} + +void rrdcontext_add_to_pp_queue(RRDCONTEXT *rc) { + if(!rc || !rc->rrdhost) return; + + spinlock_lock(&rc->rrdhost->rrdctx.pp_queue.spinlock); + + RRDCONTEXT_QUEUE_STATUS ret = rrdcontext_queue_add(&rc->rrdhost->rrdctx.pp_queue, rc, &rc->pp.idx, true); + + if(ret == RRDCONTEXT_QUEUE_ADDED) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.queued_flags = rc->flags; + rc->pp.queued_ut = now_realtime_usec(); + } + else if(ret == RRDCONTEXT_QUEUE_FOUND) { + rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.queued_flags |= rc->flags; + } + + spinlock_unlock(&rc->rrdhost->rrdctx.pp_queue.spinlock); +} + +static inline bool rrdcontext_queue_del(RRDCONTEXT_QUEUE_JudyLSet *queue, RRDCONTEXT *rc, Word_t *idx, bool having_lock) { + bool ret = false; + if(!queue || !rc || !idx) return ret; + + if(!having_lock) + spinlock_lock(&queue->spinlock); + + RRDCONTEXT *rc_found = RRDCONTEXT_QUEUE_GET(queue, *idx); + + if(rc_found == rc) { + RRDCONTEXT_QUEUE_DEL(queue, *idx); + __atomic_add_fetch(&queue->version, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&queue->entries, 1, __ATOMIC_RELAXED); + ret = true; + } + *idx = 0; + + if(!having_lock) + spinlock_unlock(&queue->spinlock); + + return ret; +} + +void rrdcontext_del_from_hub_queue(RRDCONTEXT *rc, bool having_lock) { + if(!rc || !rc->rrdhost) return; + if(!having_lock) + spinlock_lock(&rc->rrdhost->rrdctx.hub_queue.spinlock); + + if(rrdcontext_queue_del(&rc->rrdhost->rrdctx.hub_queue, rc, &rc->queue.idx, true)) { + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB); + } + + if(!having_lock) + spinlock_unlock(&rc->rrdhost->rrdctx.hub_queue.spinlock); +} + +void rrdcontext_del_from_pp_queue(RRDCONTEXT *rc, bool having_lock) { + if(!rc || !rc->rrdhost) return; + + if(!having_lock) + spinlock_lock(&rc->rrdhost->rrdctx.pp_queue.spinlock); + + if(rrdcontext_queue_del(&rc->rrdhost->rrdctx.pp_queue, rc, &rc->pp.idx, true)) { + rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP); + rc->pp.dequeued_ut = now_realtime_usec(); + } + + if(!having_lock) + spinlock_unlock(&rc->rrdhost->rrdctx.pp_queue.spinlock); +} + + +uint32_t rrdcontext_queue_version(RRDCONTEXT_QUEUE_JudyLSet *queue) { + return __atomic_load_n(&queue->version, __ATOMIC_RELAXED); +} + +int32_t rrdcontext_queue_entries(RRDCONTEXT_QUEUE_JudyLSet *queue) { + return __atomic_load_n(&queue->entries, __ATOMIC_RELAXED); +} + +void rrdcontext_post_process_queued_contexts(RRDHOST *host) { + + spinlock_lock(&host->rrdctx.pp_queue.spinlock); + Word_t idx = 0; + for(RRDCONTEXT *rc = RRDCONTEXT_QUEUE_FIRST(&host->rrdctx.pp_queue, &idx); + rc; + rc = RRDCONTEXT_QUEUE_NEXT(&host->rrdctx.pp_queue, &idx)) { + if(unlikely(!service_running(SERVICE_CONTEXT))) break; + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(host->rrdctx.contexts, string2str(rc->id)); + bool do_it = dictionary_acquired_item_value(item) == rc; + + if(do_it) + rrdcontext_del_from_pp_queue(rc, true); + + spinlock_unlock(&host->rrdctx.pp_queue.spinlock); + + if(item) { + if (do_it) + rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true); + + dictionary_acquired_item_release(host->rrdctx.contexts, item); + } + + spinlock_lock(&host->rrdctx.pp_queue.spinlock); + } + + spinlock_unlock(&host->rrdctx.pp_queue.spinlock); +} + +void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) { + // check if we have received a streaming command for this host + if(UUIDiszero(host->node_id) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_online_for_contexts()) + return; + + // check if there are queued items to send + if(!rrdcontext_queue_entries(&host->rrdctx.hub_queue)) + return; + + size_t messages_added = 0; + contexts_updated_t bundle = NULL; + + spinlock_lock(&host->rrdctx.hub_queue.spinlock); + Word_t idx = 0; + for(RRDCONTEXT *rc = RRDCONTEXT_QUEUE_FIRST(&host->rrdctx.hub_queue, &idx); + rc; + rc = RRDCONTEXT_QUEUE_NEXT(&host->rrdctx.hub_queue, &idx)) { + if(unlikely(!service_running(SERVICE_CONTEXT))) break; + + if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST)) + break; + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(host->rrdctx.contexts, string2str(rc->id)); + bool do_it = dictionary_acquired_item_value(item) == rc; + + if(do_it) { + worker_is_busy(WORKER_JOB_DEQUEUE); + rrdcontext_del_from_hub_queue(rc, true); + } + + spinlock_unlock(&host->rrdctx.hub_queue.spinlock); + + if(item) { + if (do_it) { + worker_is_busy(WORKER_JOB_QUEUED); + usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut); + CLAIM_ID claim_id = claim_id_get(); + + if(unlikely(now_ut >= dispatch_ut) && claim_id_is_set(claim_id)) { + worker_is_busy(WORKER_JOB_CHECK); + + rrdcontext_lock(rc); + + if(check_if_cloud_version_changed_unsafe(rc, true)) { + worker_is_busy(WORKER_JOB_SEND); + + if(!bundle) { + // prepare the bundle to send the messages + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(host->node_id.uuid, uuid_str); + + bundle = contexts_updated_new(claim_id.str, uuid_str, 0, now_ut); + } + // update the hub data of the context, give a new version, pack the message + // and save an update to SQL + rrdcontext_message_send_unsafe(rc, false, bundle); + messages_added++; + + rc->queue.dispatches++; + rc->queue.dequeued_ut = now_ut; + } + else + rc->version = rc->hub.version; + + if(unlikely(rrdcontext_should_be_deleted(rc))) { + // this is a deleted context - delete it forever... + + worker_is_busy(WORKER_JOB_CLEANUP_DELETE); + + rrdcontext_dequeue_from_post_processing(rc); + rrdcontext_delete_from_sql_unsafe(rc); + + STRING *id = string_dup(rc->id); + rrdcontext_unlock(rc); + + // delete it from the master dictionary + if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id))) + netdata_log_error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.", + string2str(id), rrdhost_hostname(host)); + + string_freez(id); + } + else + rrdcontext_unlock(rc); + } + } + + dictionary_acquired_item_release(host->rrdctx.contexts, item); + } + + spinlock_lock(&host->rrdctx.hub_queue.spinlock); + } + spinlock_unlock(&host->rrdctx.hub_queue.spinlock); + + if(service_running(SERVICE_CONTEXT) && bundle) { + // we have a bundle to send messages + + // update the version hash + contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host)); + + // send it + aclk_send_contexts_updated(bundle); + } + else if(bundle) + contexts_updated_delete(bundle); +} diff --git a/src/database/contexts/worker.c b/src/database/contexts/rrdcontext-worker.c similarity index 86% rename from src/database/contexts/worker.c rename to src/database/contexts/rrdcontext-worker.c index 2ad083af49..e9d741ccc2 100644 --- a/src/database/contexts/worker.c +++ b/src/database/contexts/rrdcontext-worker.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" static struct { bool enabled; @@ -14,17 +14,8 @@ static struct { .active_vs_archived_percentage = 50, }; -static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc); - static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc); -static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused); - -static void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc); - -static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc); -static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs); - static void rrdcontext_garbage_collect_for_all_hosts(void); extern usec_t rrdcontext_next_db_rotation_ut; @@ -260,7 +251,7 @@ static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) { return true; } -static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) { +bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) { if(likely(!rrd_flag_check(rc, RRD_FLAGS_REQUIRED_FOR_DELETIONS))) return false; @@ -620,7 +611,7 @@ static bool rrdinstance_forcefully_clear_retention(RRDCONTEXT *rc, size_t count, return false; } -static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) { +bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) { bool ret = false; if(reason != RRD_FLAG_NONE) @@ -804,11 +795,10 @@ static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG } } - if(unlikely(rrd_flag_is_updated(rc) && rc->rrdhost->rrdctx.hub_queue)) { + if(unlikely(rrd_flag_is_updated(rc))) { if(check_if_cloud_version_changed_unsafe(rc, false)) { rc->version = rrdcontext_get_next_version(rc); - dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.hub_queue, - string2str(rc->id), rc, sizeof(*rc)); + rrdcontext_add_to_hub_queue(rc); } } @@ -819,8 +809,6 @@ static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG } void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) { - if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return; - #if 0 if(string_strcmp(rc->id, "system.cpu") == 0) { CLEAN_BUFFER *wb = buffer_create(0, NULL); @@ -837,15 +825,7 @@ void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function _ } #endif - dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue, - string2str(rc->id), - rc, - sizeof(*rc)); -} - -static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) { - if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return; - dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id)); + rrdcontext_add_to_pp_queue(rc); } void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc) { @@ -854,24 +834,11 @@ void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc) { } void rrdcontext_delete_after_loading(RRDHOST *host, RRDCONTEXT *rc) { - rrdcontext_dequeue_from_hub_queue(rc); + rrdcontext_del_from_hub_queue(rc, false); rrdcontext_dequeue_from_post_processing(rc); dictionary_del(host->rrdctx.contexts, string2str(rc->id)); } -static void rrdcontext_post_process_queued_contexts(RRDHOST *host) { - if(unlikely(!host->rrdctx.pp_queue)) return; - - RRDCONTEXT *rc; - dfe_start_reentrant(host->rrdctx.pp_queue, rc) { - if(unlikely(!service_running(SERVICE_CONTEXT))) break; - - rrdcontext_dequeue_from_post_processing(rc); - rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true); - } - dfe_done(rc); -} - // ---------------------------------------------------------------------------- // dispatching contexts to cloud @@ -933,7 +900,7 @@ void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused } } -static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) { +bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) { bool id_changed = false, title_changed = false, units_changed = false, @@ -1004,7 +971,7 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _ return false; } -static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) { +usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) { if(likely(rc->queue.delay_calc_ut >= rc->queue.queued_ut)) return rc->queue.scheduled_dispatch_ut; @@ -1031,102 +998,6 @@ static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc return dispatch_ut; } -static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) { - dictionary_del(rc->rrdhost->rrdctx.hub_queue, string2str(rc->id)); -} - -static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) { - - // check if we have received a streaming command for this host - if(UUIDiszero(host->node_id) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_online_for_contexts() || !host->rrdctx.hub_queue) - return; - - // check if there are queued items to send - if(!dictionary_entries(host->rrdctx.hub_queue)) - return; - - size_t messages_added = 0; - contexts_updated_t bundle = NULL; - - RRDCONTEXT *rc; - dfe_start_reentrant(host->rrdctx.hub_queue, rc) { - if(unlikely(!service_running(SERVICE_CONTEXT))) break; - - if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST)) - break; - - worker_is_busy(WORKER_JOB_QUEUED); - usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut); - CLAIM_ID claim_id = claim_id_get(); - - if(unlikely(now_ut >= dispatch_ut) && claim_id_is_set(claim_id)) { - worker_is_busy(WORKER_JOB_CHECK); - - rrdcontext_lock(rc); - - if(check_if_cloud_version_changed_unsafe(rc, true)) { - worker_is_busy(WORKER_JOB_SEND); - - if(!bundle) { - // prepare the bundle to send the messages - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower(host->node_id.uuid, uuid_str); - - bundle = contexts_updated_new(claim_id.str, uuid_str, 0, now_ut); - } - // update the hub data of the context, give a new version, pack the message - // and save an update to SQL - rrdcontext_message_send_unsafe(rc, false, bundle); - messages_added++; - - rc->queue.dispatches++; - rc->queue.dequeued_ut = now_ut; - } - else - rc->version = rc->hub.version; - - // remove it from the queue - worker_is_busy(WORKER_JOB_DEQUEUE); - rrdcontext_dequeue_from_hub_queue(rc); - - if(unlikely(rrdcontext_should_be_deleted(rc))) { - // this is a deleted context - delete it forever... - - worker_is_busy(WORKER_JOB_CLEANUP_DELETE); - - rrdcontext_dequeue_from_post_processing(rc); - rrdcontext_delete_from_sql_unsafe(rc); - - STRING *id = string_dup(rc->id); - rrdcontext_unlock(rc); - - // delete it from the master dictionary - if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id))) - netdata_log_error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.", - string2str(id), rrdhost_hostname(host)); - - string_freez(id); - } - else - rrdcontext_unlock(rc); - } - } - dfe_done(rc); - - if(service_running(SERVICE_CONTEXT) && bundle) { - // we have a bundle to send messages - - // update the version hash - contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host)); - - // send it - aclk_send_contexts_updated(bundle); - } - else if(bundle) - contexts_updated_delete(bundle); - -} - // ---------------------------------------------------------------------------- // worker thread @@ -1209,17 +1080,11 @@ void *rrdcontext_main(void *ptr) { worker_is_busy(WORKER_JOB_HOSTS); - if(host->rrdctx.pp_queue) { - pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue); - rrdcontext_post_process_queued_contexts(host); - dictionary_garbage_collect(host->rrdctx.pp_queue); - } + pp_queued_contexts_for_all_hosts += rrdcontext_queue_entries(&host->rrdctx.pp_queue); + rrdcontext_post_process_queued_contexts(host); - if(host->rrdctx.hub_queue) { - hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue); - rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut); - dictionary_garbage_collect(host->rrdctx.hub_queue); - } + hub_queued_contexts_for_all_hosts += rrdcontext_queue_entries(&host->rrdctx.hub_queue); + rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut); if (host->rrdctx.contexts) dictionary_garbage_collect(host->rrdctx.contexts); diff --git a/src/database/contexts/rrdcontext.c b/src/database/contexts/rrdcontext.c index 0e4c8cf830..31d9ef6413 100644 --- a/src/database/contexts/rrdcontext.c +++ b/src/database/contexts/rrdcontext.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "internal.h" +#include "rrdcontext-internal.h" // ---------------------------------------------------------------------------- // visualizing flags diff --git a/src/database/contexts/rrdcontext.h b/src/database/contexts/rrdcontext.h index 7befee48ff..a1a0ab37ce 100644 --- a/src/database/contexts/rrdcontext.h +++ b/src/database/contexts/rrdcontext.h @@ -757,5 +757,8 @@ static inline bool query_target_has_percentage_units(QUERY_TARGET *qt) { return query_target_has_percentage_of_group(qt); } +uint32_t rrdcontext_queue_version(RRDCONTEXT_QUEUE_JudyLSet *queue); +int32_t rrdcontext_queue_entries(RRDCONTEXT_QUEUE_JudyLSet *queue); + #endif // NETDATA_RRDCONTEXT_H diff --git a/src/database/rrdhost.h b/src/database/rrdhost.h index 1ff288c188..daa5838049 100644 --- a/src/database/rrdhost.h +++ b/src/database/rrdhost.h @@ -25,6 +25,14 @@ typedef struct rrdhost_acquired RRDHOST_ACQUIRED; #include "rrdlabels.h" #include "health/health-alert-log.h" +struct rrdcontext; +DEFINE_JUDYL_TYPED_ADVANCED(RRDCONTEXT_QUEUE, struct rrdcontext *, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION, \ + SPINLOCK spinlock; \ + Word_t id; \ + uint32_t version; \ + int32_t entries; \ + ); + // ---------------------------------------------------------------------------- // RRDHOST flags // use this for configuration flags, not for state control @@ -304,8 +312,8 @@ struct rrdhost { struct { DICTIONARY *contexts; - DICTIONARY *hub_queue; - DICTIONARY *pp_queue; + RRDCONTEXT_QUEUE_JudyLSet pp_queue; + RRDCONTEXT_QUEUE_JudyLSet hub_queue; uint32_t metrics_count; // atomic uint32_t instances_count; // atomic uint32_t contexts_count; // atomic diff --git a/src/database/sqlite/sqlite_aclk_node.c b/src/database/sqlite/sqlite_aclk_node.c index 9333c581eb..9b9120156b 100644 --- a/src/database/sqlite/sqlite_aclk_node.c +++ b/src/database/sqlite/sqlite_aclk_node.c @@ -154,7 +154,7 @@ void aclk_check_node_info_and_collectors(void) if(replicating_rcv) continue; - bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue)); + bool pp_queue_empty = !rrdcontext_queue_entries(&host->rrdctx.pp_queue); if (!pp_queue_empty && (aclk_host_config->node_info_send_time || aclk_host_config->node_collectors_send)) { context_pp++; diff --git a/src/database/sqlite/sqlite_context.c b/src/database/sqlite/sqlite_context.c index 8f42811be7..210c4eed16 100644 --- a/src/database/sqlite/sqlite_context.c +++ b/src/database/sqlite/sqlite_context.c @@ -3,7 +3,7 @@ #include "sqlite_functions.h" #include "sqlite_context.h" #include "sqlite_db_migration.h" -#include "database/contexts/internal.h" +#include "database/contexts/rrdcontext-internal.h" #define DB_CONTEXT_METADATA_VERSION 1 diff --git a/src/libnetdata/libjudy/judyl-typed.h b/src/libnetdata/libjudy/judyl-typed.h index e6b5087112..7d3fe1bf3c 100644 --- a/src/libnetdata/libjudy/judyl-typed.h +++ b/src/libnetdata/libjudy/judyl-typed.h @@ -5,80 +5,87 @@ #include <Judy.h> +#ifdef __cplusplus +#define DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME) +#else +#define DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME) _Static_assert(sizeof(TYPE) <= sizeof(Word_t), #NAME "_type_must_have_same_size_as_Word_t") +#endif + + // Advanced macro for types requiring conversion -#define DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, PACK_MACRO, UNPACK_MACRO) \ - _Static_assert(sizeof(TYPE) <= sizeof(Word_t), \ - #NAME "_type_must_have_same_size_as_Word_t"); \ - typedef struct { \ - Pvoid_t judyl; \ - } NAME##_JudyLSet; \ - \ - ALWAYS_INLINE \ - static __attribute__((unused)) void NAME##_INIT(NAME##_JudyLSet *set) { \ - set->judyl = NULL; \ - } \ - \ - ALWAYS_INLINE \ - static bool __attribute__((unused)) NAME##_SET(NAME##_JudyLSet *set, Word_t index, TYPE value) { \ - Pvoid_t *pValue = JudyLIns(&set->judyl, index, PJE0); \ - if (pValue == PJERR) return false; \ - *pValue = (void *)PACK_MACRO(value); \ - return true; \ - } \ - \ - ALWAYS_INLINE \ - static TYPE __attribute__((unused)) NAME##_GET(NAME##_JudyLSet *set, Word_t index) { \ - Pvoid_t *pValue = JudyLGet(set->judyl, index, PJE0); \ - return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ - } \ - \ - ALWAYS_INLINE \ - static bool __attribute__((unused)) NAME##_DEL(NAME##_JudyLSet *set, Word_t index) { \ - return JudyLDel(&set->judyl, index, PJE0) == 1; \ - } \ - \ - ALWAYS_INLINE \ - static TYPE __attribute__((unused)) NAME##_FIRST(NAME##_JudyLSet *set, Word_t *index) { \ - Pvoid_t *pValue = JudyLFirst(set->judyl, index, PJE0); \ - return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ - } \ - \ - ALWAYS_INLINE \ - static TYPE __attribute__((unused)) NAME##_NEXT(NAME##_JudyLSet *set, Word_t *index) { \ - Pvoid_t *pValue = JudyLNext(set->judyl, index, PJE0); \ - return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ - } \ - \ - ALWAYS_INLINE \ - static TYPE __attribute__((unused)) NAME##_LAST(NAME##_JudyLSet *set, Word_t *index) { \ - Pvoid_t *pValue = JudyLLast(set->judyl, index, PJE0); \ - return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ - } \ - \ - ALWAYS_INLINE \ - static TYPE __attribute__((unused)) NAME##_PREV(NAME##_JudyLSet *set, Word_t *index) { \ - Pvoid_t *pValue = JudyLPrev(set->judyl, index, PJE0); \ - return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ - } \ - \ - ALWAYS_INLINE \ +#define DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, PACK_MACRO, UNPACK_MACRO, EXTRA_MEMBERS) \ + DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME); \ + \ + typedef struct { \ + Pvoid_t judyl; \ + EXTRA_MEMBERS \ + } NAME##_JudyLSet; \ + ALWAYS_INLINE \ + static __attribute__((unused)) void NAME##_INIT(NAME##_JudyLSet *set) { \ + set->judyl = NULL; \ + } \ + \ + ALWAYS_INLINE \ + static bool __attribute__((unused)) NAME##_SET(NAME##_JudyLSet *set, Word_t index, TYPE value) { \ + Pvoid_t *pValue = JudyLIns(&set->judyl, index, PJE0); \ + if (pValue == PJERR) return false; \ + *pValue = (void *)PACK_MACRO(value); \ + return true; \ + } \ + \ + ALWAYS_INLINE \ + static TYPE __attribute__((unused)) NAME##_GET(NAME##_JudyLSet *set, Word_t index) { \ + Pvoid_t *pValue = JudyLGet(set->judyl, index, PJE0); \ + return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ + } \ + \ + ALWAYS_INLINE \ + static bool __attribute__((unused)) NAME##_DEL(NAME##_JudyLSet *set, Word_t index) { \ + return JudyLDel(&set->judyl, index, PJE0) == 1; \ + } \ + \ + ALWAYS_INLINE \ + static TYPE __attribute__((unused)) NAME##_FIRST(NAME##_JudyLSet *set, Word_t *index) { \ + Pvoid_t *pValue = JudyLFirst(set->judyl, index, PJE0); \ + return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ + } \ + \ + ALWAYS_INLINE \ + static TYPE __attribute__((unused)) NAME##_NEXT(NAME##_JudyLSet *set, Word_t *index) { \ + Pvoid_t *pValue = JudyLNext(set->judyl, index, PJE0); \ + return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ + } \ + \ + ALWAYS_INLINE \ + static TYPE __attribute__((unused)) NAME##_LAST(NAME##_JudyLSet *set, Word_t *index) { \ + Pvoid_t *pValue = JudyLLast(set->judyl, index, PJE0); \ + return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ + } \ + \ + ALWAYS_INLINE \ + static TYPE __attribute__((unused)) NAME##_PREV(NAME##_JudyLSet *set, Word_t *index) { \ + Pvoid_t *pValue = JudyLPrev(set->judyl, index, PJE0); \ + return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \ + } \ + \ + ALWAYS_INLINE \ static void __attribute__((unused)) NAME##_FREE(NAME##_JudyLSet *set, void (*callback)(Word_t, TYPE, void *), void *data) { \ - Word_t index = 0; \ - Pvoid_t *pValue; \ - if (callback) { \ - for (pValue = JudyLFirst(set->judyl, &index, PJE0); \ - pValue != NULL; \ - pValue = JudyLNext(set->judyl, &index, PJE0)) { \ - callback(index, (TYPE)UNPACK_MACRO(*pValue), data); \ - } \ - } \ - JudyLFreeArray(&set->judyl, PJE0); \ + Word_t index = 0; \ + Pvoid_t *pValue; \ + if (callback) { \ + for (pValue = JudyLFirst(set->judyl, &index, PJE0); \ + pValue != NULL; \ + pValue = JudyLNext(set->judyl, &index, PJE0)) { \ + callback(index, (TYPE)UNPACK_MACRO(*pValue), data); \ + } \ + } \ + JudyLFreeArray(&set->judyl, PJE0); \ } // Basic macro for types with no conversion #define JUDYL_TYPED_NO_CONVERSION(value) (uintptr_t)(value) -#define DEFINE_JUDYL_TYPED(NAME, TYPE) \ - DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION) +#define DEFINE_JUDYL_TYPED(NAME, TYPE) \ + DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION,) #endif //NETDATA_JUDYL_TYPED_H diff --git a/src/web/api/functions/function-metrics-cardinality.c b/src/web/api/functions/function-metrics-cardinality.c index 7ae243737c..27031113aa 100644 --- a/src/web/api/functions/function-metrics-cardinality.c +++ b/src/web/api/functions/function-metrics-cardinality.c @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "function-metrics-cardinality.h" -#include "database/contexts/internal.h" +#include "database/contexts/rrdcontext-internal.h" struct counts { size_t nodes;