0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-05-11 04:10:55 +00:00

use judy queues instead of dictionary queues

This commit is contained in:
Costa Tsaousis 2025-04-22 19:47:10 +03:00
parent 999133d69f
commit c7cf10df82
No known key found for this signature in database
GPG key ID: 3A4F28E963D28A1F
23 changed files with 425 additions and 335 deletions

View file

@ -1598,15 +1598,15 @@ set(RRD_PLUGIN_FILES
src/database/contexts/api_v2_contexts_alerts.h
src/database/contexts/api_v2_contexts_alert_transitions.c
src/database/contexts/api_v2_contexts_alert_config.c
src/database/contexts/context.c
src/database/contexts/instance.c
src/database/contexts/internal.h
src/database/contexts/metric.c
src/database/contexts/rrdcontext-context.c
src/database/contexts/rrdcontext-instance.c
src/database/contexts/rrdcontext-internal.h
src/database/contexts/rrdcontext-metric.c
src/database/contexts/query_scope.c
src/database/contexts/query_target.c
src/database/contexts/rrdcontext.c
src/database/contexts/rrdcontext.h
src/database/contexts/worker.c
src/database/contexts/rrdcontext-worker.c
src/database/rrdcollector.c
src/database/rrdcollector.h
src/database/rrddim.c
@ -1658,7 +1658,7 @@ set(RRD_PLUGIN_FILES
src/database/rrd-database-mode.c
src/database/rrdhost-system-info.c
src/database/rrdhost-system-info.h
src/database/contexts/contexts-loading.c
src/database/contexts/rrdcontext-loading.c
src/database/rrdset-index-id.c
src/database/rrdset-index-id.h
src/database/rrdset-index-name.c
@ -1688,6 +1688,7 @@ set(RRD_PLUGIN_FILES
src/database/rrdhost-collection.h
src/database/pattern-array.c
src/database/pattern-array.h
src/database/contexts/rrdcontext-queues.c
)
if(ENABLE_DBENGINE)
@ -2134,7 +2135,7 @@ set(ACLK_FILES
src/aclk/schema-wrappers/context_stream.cc
src/aclk/schema-wrappers/context_stream.h
src/aclk/schema-wrappers/context.cc
src/aclk/schema-wrappers/context.h
src/aclk/schema-wrappers/rrdcontext-context.h
src/aclk/schema-wrappers/schema_wrappers.h
src/aclk/schema-wrappers/schema_wrapper_utils.cc
src/aclk/schema-wrappers/schema_wrapper_utils.h

View file

@ -6,7 +6,7 @@
#include "schema_wrapper_utils.h"
#include "context.h"
#include "rrdcontext-context.h"
using namespace context::v1;

View file

@ -13,7 +13,7 @@
#include "node_info.h"
#include "capability.h"
#include "context_stream.h"
#include "context.h"
#include "rrdcontext-context.h"
#include "agent_cmds.h"
#endif /* SCHEMA_WRAPPERS_H */

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
static void rrd_flags_to_buffer_json_array_items(RRD_FLAGS flags, BUFFER *wb) {
if(flags & RRD_FLAG_QUEUED_FOR_HUB)

View file

@ -3,7 +3,7 @@
#ifndef NETDATA_API_V2_CONTEXTS_H
#define NETDATA_API_V2_CONTEXTS_H
#include "internal.h"
#include "rrdcontext-internal.h"
typedef enum __attribute__ ((__packed__)) {
FTS_MATCHED_NONE = 0,

View file

@ -3,7 +3,7 @@
#ifndef NETDATA_API_V2_CONTEXTS_ALERTS_H
#define NETDATA_API_V2_CONTEXTS_ALERTS_H
#include "internal.h"
#include "rrdcontext-internal.h"
#include "api_v2_contexts.h"
struct alert_transitions_callback_data {

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
ssize_t query_scope_foreach_host(SIMPLE_PATTERN *scope_hosts_sp, SIMPLE_PATTERN *hosts_sp,
foreach_host_cb_t cb, void *data,
@ -50,7 +50,7 @@ ssize_t query_scope_foreach_host(SIMPLE_PATTERN *scope_hosts_sp, SIMPLE_PATTERN
bool queryable_host = (match == SP_MATCHED_POSITIVE);
v_hash += dictionary_version(host->rrdctx.contexts);
h_hash += dictionary_version(host->rrdctx.hub_queue);
h_hash += rrdcontext_queue_version(&host->rrdctx.hub_queue);
a_hash += dictionary_version(host->rrdcalc_root_index);
t_hash += __atomic_load_n(&host->health_transitions, __ATOMIC_RELAXED);
ssize_t ret = cb(data, host, queryable_host);

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
#define QUERY_TARGET_MAX_REALLOC_INCREASE 500
#define query_target_realloc_size(size, start) \
@ -1128,7 +1128,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
// single host query
qt->versions.contexts_hard_hash = dictionary_version(host->rrdctx.contexts);
qt->versions.contexts_soft_hash = dictionary_version(host->rrdctx.hub_queue);
qt->versions.contexts_soft_hash = rrdcontext_queue_version(&host->rrdctx.hub_queue);
qt->versions.alerts_hard_hash = dictionary_version(host->rrdcalc_root_index);
qt->versions.alerts_soft_hash = __atomic_load_n(&host->health_transitions, __ATOMIC_RELAXED);
query_node_add(&qtl, host, true);

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) {
RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
@ -96,11 +96,8 @@ static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unuse
// update the count of contexts
__atomic_sub_fetch(&rc->rrdhost->rrdctx.contexts_count, 1, __ATOMIC_RELAXED);
if(rc->rrdhost->rrdctx.hub_queue)
dictionary_del(rc->rrdhost->rrdctx.hub_queue, string2str(rc->id));
if(rc->rrdhost->rrdctx.pp_queue)
dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id));
rrdcontext_del_from_hub_queue(rc, false);
rrdcontext_del_from_pp_queue(rc, false);
rrdinstances_destroy_from_rrdcontext(rc);
rrdcontext_freez(rc);
@ -235,64 +232,6 @@ void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) {
rrdcontext_queue_for_post_processing(rc, function, rc->flags);
}
static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags = rrd_flags_get(rc);
}
static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
}
static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
// context and new_context are the same
// we just need to update the timings
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags |= rrd_flags_get(rc);
return true;
}
static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
rc->pp.queued_flags = rc->flags;
rc->pp.queued_ut = now_realtime_usec();
}
static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
// IMPORTANT:
// Do not rely on this flag being absent, because the dictionaries have delayed deletions (garbage collect)
// so, this flag may not be deleted immediately from the context.
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP);
rc->pp.dequeued_ut = now_realtime_usec();
}
static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
bool changed = false;
if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) {
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
changed = true;
}
if(rc->pp.queued_flags != rc->flags) {
rc->pp.queued_flags |= rc->flags;
changed = true;
}
return changed;
}
void rrdhost_create_rrdcontexts(RRDHOST *host) {
if(unlikely(!host)) return;
if(likely(host->rrdctx.contexts)) return;
@ -306,42 +245,19 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host);
dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host);
host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
memset(&host->rrdctx.pp_queue, 0, sizeof(host->rrdctx.pp_queue));
RRDCONTEXT_QUEUE_INIT(&host->rrdctx.pp_queue);
spinlock_init(&host->rrdctx.pp_queue.spinlock);
host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
memset(&host->rrdctx.hub_queue, 0, sizeof(host->rrdctx.hub_queue));
RRDCONTEXT_QUEUE_INIT(&host->rrdctx.hub_queue);
spinlock_init(&host->rrdctx.hub_queue.spinlock);
}
void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
if(unlikely(!host)) return;
if(unlikely(!host->rrdctx.contexts)) return;
if(host->rrdctx.hub_queue) {
RRDCONTEXT *rc;
dfe_start_write(host->rrdctx.hub_queue, rc) {
dictionary_del(host->rrdctx.hub_queue, string2str(rc->id));
}
dfe_done(rc);
dictionary_destroy(host->rrdctx.hub_queue);
host->rrdctx.hub_queue = NULL;
}
if(host->rrdctx.pp_queue) {
RRDCONTEXT *rc;
dfe_start_write(host->rrdctx.pp_queue, rc) {
dictionary_del(host->rrdctx.pp_queue, string2str(rc->id));
}
dfe_done(rc);
dictionary_destroy(host->rrdctx.pp_queue);
host->rrdctx.pp_queue = NULL;
}
dictionary_destroy(host->rrdctx.contexts);
host->rrdctx.contexts = NULL;
}

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
// ----------------------------------------------------------------------------
// helper one-liners for RRDINSTANCE

View file

@ -5,7 +5,7 @@
#include "rrdcontext.h"
#include "../sqlite/sqlite_context.h"
#include "../../aclk/schema-wrappers/context.h"
#include "../../aclk/schema-wrappers/rrdcontext-context.h"
#include "../../aclk/aclk_contexts_api.h"
#include "../../aclk/aclk.h"
#include "../storage-engine.h"
@ -279,6 +279,7 @@ typedef struct rrdcontext {
RRDHOST *rrdhost;
struct {
Word_t idx;
RRD_FLAGS queued_flags; // the last flags that triggered the post-processing
size_t executions; // how many times this context has been processed
usec_t queued_ut; // the last time this was queued
@ -286,6 +287,7 @@ typedef struct rrdcontext {
} pp;
struct {
Word_t idx;
RRD_FLAGS queued_flags; // the last flags that triggered the queueing
size_t dispatches; // the number of times this has been dispatched to hub
usec_t queued_ut; // the last time this was queued
@ -295,6 +297,11 @@ typedef struct rrdcontext {
} queue;
} RRDCONTEXT;
void rrdcontext_add_to_pp_queue(RRDCONTEXT *rc);
void rrdcontext_add_to_hub_queue(RRDCONTEXT *rc);
void rrdcontext_del_from_hub_queue(RRDCONTEXT *rc, bool having_lock);
void rrdcontext_del_from_pp_queue(RRDCONTEXT *rc, bool having_lock);
// ----------------------------------------------------------------------------
// helpers for counting collected metrics, instances and contexts
@ -484,4 +491,15 @@ void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc);
RRDLABELS *rrdinstance_labels(RRDINSTANCE *ri);
bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
void rrdcontext_post_process_queued_contexts(RRDHOST *host);
void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut);
usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut);
bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending);
bool rrdcontext_should_be_deleted(RRDCONTEXT *rc);
void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
static inline void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
rrdcontext_del_from_pp_queue(rc, false);
}
#endif //NETDATA_RRDCONTEXT_INTERNAL_H

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
static __thread size_t th_ignored_metrics = 0, th_ignored_instances = 0, th_zero_retention_metrics = 0;

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function);

View file

@ -0,0 +1,272 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdcontext-internal.h"
typedef enum {
RRDCONTEXT_QUEUE_INVALID = 0,
RRDCONTEXT_QUEUE_ADDED,
RRDCONTEXT_QUEUE_FOUND,
} RRDCONTEXT_QUEUE_STATUS;
static inline RRDCONTEXT_QUEUE_STATUS rrdcontext_queue_add(RRDCONTEXT_QUEUE_JudyLSet *queue, RRDCONTEXT *rc, Word_t *idx, bool having_lock) {
RRDCONTEXT_QUEUE_STATUS ret = RRDCONTEXT_QUEUE_INVALID;
if(!queue || !rc || !idx) return ret;
if(!having_lock)
spinlock_lock(&queue->spinlock);
if(*idx) {
fatal_assert(RRDCONTEXT_QUEUE_GET(queue, *idx) == rc);
ret = RRDCONTEXT_QUEUE_FOUND;
}
else {
*idx = queue->id++;
RRDCONTEXT_QUEUE_SET(queue, *idx, rc);
__atomic_add_fetch(&queue->version, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&queue->entries, 1, __ATOMIC_RELAXED);
ret = RRDCONTEXT_QUEUE_ADDED;
}
if(!having_lock)
spinlock_unlock(&queue->spinlock);
return ret;
}
void rrdcontext_add_to_hub_queue(RRDCONTEXT *rc) {
if(!rc || !rc->rrdhost) return;
spinlock_lock(&rc->rrdhost->rrdctx.hub_queue.spinlock);
RRDCONTEXT_QUEUE_STATUS ret = rrdcontext_queue_add(&rc->rrdhost->rrdctx.hub_queue, rc, &rc->queue.idx, true);
if(ret == RRDCONTEXT_QUEUE_ADDED) {
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags = rrd_flags_get(rc);
}
else if(ret == RRDCONTEXT_QUEUE_FOUND) {
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags |= rrd_flags_get(rc);
}
spinlock_unlock(&rc->rrdhost->rrdctx.hub_queue.spinlock);
}
void rrdcontext_add_to_pp_queue(RRDCONTEXT *rc) {
if(!rc || !rc->rrdhost) return;
spinlock_lock(&rc->rrdhost->rrdctx.pp_queue.spinlock);
RRDCONTEXT_QUEUE_STATUS ret = rrdcontext_queue_add(&rc->rrdhost->rrdctx.pp_queue, rc, &rc->pp.idx, true);
if(ret == RRDCONTEXT_QUEUE_ADDED) {
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
rc->pp.queued_flags = rc->flags;
rc->pp.queued_ut = now_realtime_usec();
}
else if(ret == RRDCONTEXT_QUEUE_FOUND) {
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
rc->pp.queued_flags |= rc->flags;
}
spinlock_unlock(&rc->rrdhost->rrdctx.pp_queue.spinlock);
}
static inline bool rrdcontext_queue_del(RRDCONTEXT_QUEUE_JudyLSet *queue, RRDCONTEXT *rc, Word_t *idx, bool having_lock) {
bool ret = false;
if(!queue || !rc || !idx) return ret;
if(!having_lock)
spinlock_lock(&queue->spinlock);
RRDCONTEXT *rc_found = RRDCONTEXT_QUEUE_GET(queue, *idx);
if(rc_found == rc) {
RRDCONTEXT_QUEUE_DEL(queue, *idx);
__atomic_add_fetch(&queue->version, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&queue->entries, 1, __ATOMIC_RELAXED);
ret = true;
}
*idx = 0;
if(!having_lock)
spinlock_unlock(&queue->spinlock);
return ret;
}
void rrdcontext_del_from_hub_queue(RRDCONTEXT *rc, bool having_lock) {
if(!rc || !rc->rrdhost) return;
if(!having_lock)
spinlock_lock(&rc->rrdhost->rrdctx.hub_queue.spinlock);
if(rrdcontext_queue_del(&rc->rrdhost->rrdctx.hub_queue, rc, &rc->queue.idx, true)) {
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
}
if(!having_lock)
spinlock_unlock(&rc->rrdhost->rrdctx.hub_queue.spinlock);
}
void rrdcontext_del_from_pp_queue(RRDCONTEXT *rc, bool having_lock) {
if(!rc || !rc->rrdhost) return;
if(!having_lock)
spinlock_lock(&rc->rrdhost->rrdctx.pp_queue.spinlock);
if(rrdcontext_queue_del(&rc->rrdhost->rrdctx.pp_queue, rc, &rc->pp.idx, true)) {
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP);
rc->pp.dequeued_ut = now_realtime_usec();
}
if(!having_lock)
spinlock_unlock(&rc->rrdhost->rrdctx.pp_queue.spinlock);
}
uint32_t rrdcontext_queue_version(RRDCONTEXT_QUEUE_JudyLSet *queue) {
return __atomic_load_n(&queue->version, __ATOMIC_RELAXED);
}
int32_t rrdcontext_queue_entries(RRDCONTEXT_QUEUE_JudyLSet *queue) {
return __atomic_load_n(&queue->entries, __ATOMIC_RELAXED);
}
void rrdcontext_post_process_queued_contexts(RRDHOST *host) {
spinlock_lock(&host->rrdctx.pp_queue.spinlock);
Word_t idx = 0;
for(RRDCONTEXT *rc = RRDCONTEXT_QUEUE_FIRST(&host->rrdctx.pp_queue, &idx);
rc;
rc = RRDCONTEXT_QUEUE_NEXT(&host->rrdctx.pp_queue, &idx)) {
if(unlikely(!service_running(SERVICE_CONTEXT))) break;
const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(host->rrdctx.contexts, string2str(rc->id));
bool do_it = dictionary_acquired_item_value(item) == rc;
if(do_it)
rrdcontext_del_from_pp_queue(rc, true);
spinlock_unlock(&host->rrdctx.pp_queue.spinlock);
if(item) {
if (do_it)
rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true);
dictionary_acquired_item_release(host->rrdctx.contexts, item);
}
spinlock_lock(&host->rrdctx.pp_queue.spinlock);
}
spinlock_unlock(&host->rrdctx.pp_queue.spinlock);
}
void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) {
// check if we have received a streaming command for this host
if(UUIDiszero(host->node_id) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_online_for_contexts())
return;
// check if there are queued items to send
if(!rrdcontext_queue_entries(&host->rrdctx.hub_queue))
return;
size_t messages_added = 0;
contexts_updated_t bundle = NULL;
spinlock_lock(&host->rrdctx.hub_queue.spinlock);
Word_t idx = 0;
for(RRDCONTEXT *rc = RRDCONTEXT_QUEUE_FIRST(&host->rrdctx.hub_queue, &idx);
rc;
rc = RRDCONTEXT_QUEUE_NEXT(&host->rrdctx.hub_queue, &idx)) {
if(unlikely(!service_running(SERVICE_CONTEXT))) break;
if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST))
break;
const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(host->rrdctx.contexts, string2str(rc->id));
bool do_it = dictionary_acquired_item_value(item) == rc;
if(do_it) {
worker_is_busy(WORKER_JOB_DEQUEUE);
rrdcontext_del_from_hub_queue(rc, true);
}
spinlock_unlock(&host->rrdctx.hub_queue.spinlock);
if(item) {
if (do_it) {
worker_is_busy(WORKER_JOB_QUEUED);
usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut);
CLAIM_ID claim_id = claim_id_get();
if(unlikely(now_ut >= dispatch_ut) && claim_id_is_set(claim_id)) {
worker_is_busy(WORKER_JOB_CHECK);
rrdcontext_lock(rc);
if(check_if_cloud_version_changed_unsafe(rc, true)) {
worker_is_busy(WORKER_JOB_SEND);
if(!bundle) {
// prepare the bundle to send the messages
char uuid_str[UUID_STR_LEN];
uuid_unparse_lower(host->node_id.uuid, uuid_str);
bundle = contexts_updated_new(claim_id.str, uuid_str, 0, now_ut);
}
// update the hub data of the context, give a new version, pack the message
// and save an update to SQL
rrdcontext_message_send_unsafe(rc, false, bundle);
messages_added++;
rc->queue.dispatches++;
rc->queue.dequeued_ut = now_ut;
}
else
rc->version = rc->hub.version;
if(unlikely(rrdcontext_should_be_deleted(rc))) {
// this is a deleted context - delete it forever...
worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
rrdcontext_dequeue_from_post_processing(rc);
rrdcontext_delete_from_sql_unsafe(rc);
STRING *id = string_dup(rc->id);
rrdcontext_unlock(rc);
// delete it from the master dictionary
if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id)))
netdata_log_error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
string2str(id), rrdhost_hostname(host));
string_freez(id);
}
else
rrdcontext_unlock(rc);
}
}
dictionary_acquired_item_release(host->rrdctx.contexts, item);
}
spinlock_lock(&host->rrdctx.hub_queue.spinlock);
}
spinlock_unlock(&host->rrdctx.hub_queue.spinlock);
if(service_running(SERVICE_CONTEXT) && bundle) {
// we have a bundle to send messages
// update the version hash
contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host));
// send it
aclk_send_contexts_updated(bundle);
}
else if(bundle)
contexts_updated_delete(bundle);
}

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
static struct {
bool enabled;
@ -14,17 +14,8 @@ static struct {
.active_vs_archived_percentage = 50,
};
static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc);
static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc);
static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused);
static void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc);
static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
static void rrdcontext_garbage_collect_for_all_hosts(void);
extern usec_t rrdcontext_next_db_rotation_ut;
@ -260,7 +251,7 @@ static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
return true;
}
static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
if(likely(!rrd_flag_check(rc, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
return false;
@ -620,7 +611,7 @@ static bool rrdinstance_forcefully_clear_retention(RRDCONTEXT *rc, size_t count,
return false;
}
static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) {
bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) {
bool ret = false;
if(reason != RRD_FLAG_NONE)
@ -804,11 +795,10 @@ static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
}
}
if(unlikely(rrd_flag_is_updated(rc) && rc->rrdhost->rrdctx.hub_queue)) {
if(unlikely(rrd_flag_is_updated(rc))) {
if(check_if_cloud_version_changed_unsafe(rc, false)) {
rc->version = rrdcontext_get_next_version(rc);
dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.hub_queue,
string2str(rc->id), rc, sizeof(*rc));
rrdcontext_add_to_hub_queue(rc);
}
}
@ -819,8 +809,6 @@ static bool rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
}
void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) {
if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
#if 0
if(string_strcmp(rc->id, "system.cpu") == 0) {
CLEAN_BUFFER *wb = buffer_create(0, NULL);
@ -837,15 +825,7 @@ void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function _
}
#endif
dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue,
string2str(rc->id),
rc,
sizeof(*rc));
}
static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id));
rrdcontext_add_to_pp_queue(rc);
}
void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc) {
@ -854,24 +834,11 @@ void rrdcontext_initial_processing_after_loading(RRDCONTEXT *rc) {
}
void rrdcontext_delete_after_loading(RRDHOST *host, RRDCONTEXT *rc) {
rrdcontext_dequeue_from_hub_queue(rc);
rrdcontext_del_from_hub_queue(rc, false);
rrdcontext_dequeue_from_post_processing(rc);
dictionary_del(host->rrdctx.contexts, string2str(rc->id));
}
static void rrdcontext_post_process_queued_contexts(RRDHOST *host) {
if(unlikely(!host->rrdctx.pp_queue)) return;
RRDCONTEXT *rc;
dfe_start_reentrant(host->rrdctx.pp_queue, rc) {
if(unlikely(!service_running(SERVICE_CONTEXT))) break;
rrdcontext_dequeue_from_post_processing(rc);
rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true);
}
dfe_done(rc);
}
// ----------------------------------------------------------------------------
// dispatching contexts to cloud
@ -933,7 +900,7 @@ void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused
}
}
static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) {
bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) {
bool id_changed = false,
title_changed = false,
units_changed = false,
@ -1004,7 +971,7 @@ static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending _
return false;
}
static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) {
usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) {
if(likely(rc->queue.delay_calc_ut >= rc->queue.queued_ut))
return rc->queue.scheduled_dispatch_ut;
@ -1031,102 +998,6 @@ static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc
return dispatch_ut;
}
static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) {
dictionary_del(rc->rrdhost->rrdctx.hub_queue, string2str(rc->id));
}
static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) {
// check if we have received a streaming command for this host
if(UUIDiszero(host->node_id) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS) || !aclk_online_for_contexts() || !host->rrdctx.hub_queue)
return;
// check if there are queued items to send
if(!dictionary_entries(host->rrdctx.hub_queue))
return;
size_t messages_added = 0;
contexts_updated_t bundle = NULL;
RRDCONTEXT *rc;
dfe_start_reentrant(host->rrdctx.hub_queue, rc) {
if(unlikely(!service_running(SERVICE_CONTEXT))) break;
if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST))
break;
worker_is_busy(WORKER_JOB_QUEUED);
usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut);
CLAIM_ID claim_id = claim_id_get();
if(unlikely(now_ut >= dispatch_ut) && claim_id_is_set(claim_id)) {
worker_is_busy(WORKER_JOB_CHECK);
rrdcontext_lock(rc);
if(check_if_cloud_version_changed_unsafe(rc, true)) {
worker_is_busy(WORKER_JOB_SEND);
if(!bundle) {
// prepare the bundle to send the messages
char uuid_str[UUID_STR_LEN];
uuid_unparse_lower(host->node_id.uuid, uuid_str);
bundle = contexts_updated_new(claim_id.str, uuid_str, 0, now_ut);
}
// update the hub data of the context, give a new version, pack the message
// and save an update to SQL
rrdcontext_message_send_unsafe(rc, false, bundle);
messages_added++;
rc->queue.dispatches++;
rc->queue.dequeued_ut = now_ut;
}
else
rc->version = rc->hub.version;
// remove it from the queue
worker_is_busy(WORKER_JOB_DEQUEUE);
rrdcontext_dequeue_from_hub_queue(rc);
if(unlikely(rrdcontext_should_be_deleted(rc))) {
// this is a deleted context - delete it forever...
worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
rrdcontext_dequeue_from_post_processing(rc);
rrdcontext_delete_from_sql_unsafe(rc);
STRING *id = string_dup(rc->id);
rrdcontext_unlock(rc);
// delete it from the master dictionary
if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id)))
netdata_log_error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
string2str(id), rrdhost_hostname(host));
string_freez(id);
}
else
rrdcontext_unlock(rc);
}
}
dfe_done(rc);
if(service_running(SERVICE_CONTEXT) && bundle) {
// we have a bundle to send messages
// update the version hash
contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host));
// send it
aclk_send_contexts_updated(bundle);
}
else if(bundle)
contexts_updated_delete(bundle);
}
// ----------------------------------------------------------------------------
// worker thread
@ -1209,17 +1080,11 @@ void *rrdcontext_main(void *ptr) {
worker_is_busy(WORKER_JOB_HOSTS);
if(host->rrdctx.pp_queue) {
pp_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.pp_queue);
rrdcontext_post_process_queued_contexts(host);
dictionary_garbage_collect(host->rrdctx.pp_queue);
}
pp_queued_contexts_for_all_hosts += rrdcontext_queue_entries(&host->rrdctx.pp_queue);
rrdcontext_post_process_queued_contexts(host);
if(host->rrdctx.hub_queue) {
hub_queued_contexts_for_all_hosts += dictionary_entries(host->rrdctx.hub_queue);
rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
dictionary_garbage_collect(host->rrdctx.hub_queue);
}
hub_queued_contexts_for_all_hosts += rrdcontext_queue_entries(&host->rrdctx.hub_queue);
rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
if (host->rrdctx.contexts)
dictionary_garbage_collect(host->rrdctx.contexts);

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "internal.h"
#include "rrdcontext-internal.h"
// ----------------------------------------------------------------------------
// visualizing flags

View file

@ -757,5 +757,8 @@ static inline bool query_target_has_percentage_units(QUERY_TARGET *qt) {
return query_target_has_percentage_of_group(qt);
}
uint32_t rrdcontext_queue_version(RRDCONTEXT_QUEUE_JudyLSet *queue);
int32_t rrdcontext_queue_entries(RRDCONTEXT_QUEUE_JudyLSet *queue);
#endif // NETDATA_RRDCONTEXT_H

View file

@ -25,6 +25,14 @@ typedef struct rrdhost_acquired RRDHOST_ACQUIRED;
#include "rrdlabels.h"
#include "health/health-alert-log.h"
struct rrdcontext;
DEFINE_JUDYL_TYPED_ADVANCED(RRDCONTEXT_QUEUE, struct rrdcontext *, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION, \
SPINLOCK spinlock; \
Word_t id; \
uint32_t version; \
int32_t entries; \
);
// ----------------------------------------------------------------------------
// RRDHOST flags
// use this for configuration flags, not for state control
@ -304,8 +312,8 @@ struct rrdhost {
struct {
DICTIONARY *contexts;
DICTIONARY *hub_queue;
DICTIONARY *pp_queue;
RRDCONTEXT_QUEUE_JudyLSet pp_queue;
RRDCONTEXT_QUEUE_JudyLSet hub_queue;
uint32_t metrics_count; // atomic
uint32_t instances_count; // atomic
uint32_t contexts_count; // atomic

View file

@ -154,7 +154,7 @@ void aclk_check_node_info_and_collectors(void)
if(replicating_rcv)
continue;
bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue));
bool pp_queue_empty = !rrdcontext_queue_entries(&host->rrdctx.pp_queue);
if (!pp_queue_empty && (aclk_host_config->node_info_send_time || aclk_host_config->node_collectors_send)) {
context_pp++;

View file

@ -3,7 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_context.h"
#include "sqlite_db_migration.h"
#include "database/contexts/internal.h"
#include "database/contexts/rrdcontext-internal.h"
#define DB_CONTEXT_METADATA_VERSION 1

View file

@ -5,80 +5,87 @@
#include <Judy.h>
#ifdef __cplusplus
#define DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME)
#else
#define DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME) _Static_assert(sizeof(TYPE) <= sizeof(Word_t), #NAME "_type_must_have_same_size_as_Word_t")
#endif
// Advanced macro for types requiring conversion
#define DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, PACK_MACRO, UNPACK_MACRO) \
_Static_assert(sizeof(TYPE) <= sizeof(Word_t), \
#NAME "_type_must_have_same_size_as_Word_t"); \
typedef struct { \
Pvoid_t judyl; \
} NAME##_JudyLSet; \
\
ALWAYS_INLINE \
static __attribute__((unused)) void NAME##_INIT(NAME##_JudyLSet *set) { \
set->judyl = NULL; \
} \
\
ALWAYS_INLINE \
static bool __attribute__((unused)) NAME##_SET(NAME##_JudyLSet *set, Word_t index, TYPE value) { \
Pvoid_t *pValue = JudyLIns(&set->judyl, index, PJE0); \
if (pValue == PJERR) return false; \
*pValue = (void *)PACK_MACRO(value); \
return true; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_GET(NAME##_JudyLSet *set, Word_t index) { \
Pvoid_t *pValue = JudyLGet(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static bool __attribute__((unused)) NAME##_DEL(NAME##_JudyLSet *set, Word_t index) { \
return JudyLDel(&set->judyl, index, PJE0) == 1; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_FIRST(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLFirst(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_NEXT(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLNext(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_LAST(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLLast(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_PREV(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLPrev(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
#define DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, PACK_MACRO, UNPACK_MACRO, EXTRA_MEMBERS) \
DEFINED_JUDYL_CHECK_SIZE(TYPE, NAME); \
\
typedef struct { \
Pvoid_t judyl; \
EXTRA_MEMBERS \
} NAME##_JudyLSet; \
ALWAYS_INLINE \
static __attribute__((unused)) void NAME##_INIT(NAME##_JudyLSet *set) { \
set->judyl = NULL; \
} \
\
ALWAYS_INLINE \
static bool __attribute__((unused)) NAME##_SET(NAME##_JudyLSet *set, Word_t index, TYPE value) { \
Pvoid_t *pValue = JudyLIns(&set->judyl, index, PJE0); \
if (pValue == PJERR) return false; \
*pValue = (void *)PACK_MACRO(value); \
return true; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_GET(NAME##_JudyLSet *set, Word_t index) { \
Pvoid_t *pValue = JudyLGet(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static bool __attribute__((unused)) NAME##_DEL(NAME##_JudyLSet *set, Word_t index) { \
return JudyLDel(&set->judyl, index, PJE0) == 1; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_FIRST(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLFirst(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_NEXT(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLNext(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_LAST(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLLast(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static TYPE __attribute__((unused)) NAME##_PREV(NAME##_JudyLSet *set, Word_t *index) { \
Pvoid_t *pValue = JudyLPrev(set->judyl, index, PJE0); \
return (pValue != NULL) ? (TYPE)UNPACK_MACRO(*pValue) : (TYPE){0}; \
} \
\
ALWAYS_INLINE \
static void __attribute__((unused)) NAME##_FREE(NAME##_JudyLSet *set, void (*callback)(Word_t, TYPE, void *), void *data) { \
Word_t index = 0; \
Pvoid_t *pValue; \
if (callback) { \
for (pValue = JudyLFirst(set->judyl, &index, PJE0); \
pValue != NULL; \
pValue = JudyLNext(set->judyl, &index, PJE0)) { \
callback(index, (TYPE)UNPACK_MACRO(*pValue), data); \
} \
} \
JudyLFreeArray(&set->judyl, PJE0); \
Word_t index = 0; \
Pvoid_t *pValue; \
if (callback) { \
for (pValue = JudyLFirst(set->judyl, &index, PJE0); \
pValue != NULL; \
pValue = JudyLNext(set->judyl, &index, PJE0)) { \
callback(index, (TYPE)UNPACK_MACRO(*pValue), data); \
} \
} \
JudyLFreeArray(&set->judyl, PJE0); \
}
// Basic macro for types with no conversion
#define JUDYL_TYPED_NO_CONVERSION(value) (uintptr_t)(value)
#define DEFINE_JUDYL_TYPED(NAME, TYPE) \
DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION)
#define DEFINE_JUDYL_TYPED(NAME, TYPE) \
DEFINE_JUDYL_TYPED_ADVANCED(NAME, TYPE, JUDYL_TYPED_NO_CONVERSION, JUDYL_TYPED_NO_CONVERSION,)
#endif //NETDATA_JUDYL_TYPED_H

View file

@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "function-metrics-cardinality.h"
#include "database/contexts/internal.h"
#include "database/contexts/rrdcontext-internal.h"
struct counts {
size_t nodes;