diff --git a/src/daemon/daemon-shutdown-watcher.c b/src/daemon/daemon-shutdown-watcher.c index 1dbfc05a42..46dbabfdf8 100644 --- a/src/daemon/daemon-shutdown-watcher.c +++ b/src/daemon/daemon-shutdown-watcher.c @@ -35,7 +35,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo watcher_steps[step_id].msg); #if defined(FSANITIZE_ADDRESS) - fprintf(stderr, " > shutdown step: [%d/%d] - {at %s} started '%s'...\n", + fprintf(stdout, " > shutdown step: [%d/%d] - {at %s} started '%s'...\n", (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, watcher_steps[step_id].msg); #endif @@ -69,7 +69,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo watcher_steps[step_id].msg, step_duration_txt); #if defined(FSANITIZE_ADDRESS) - fprintf(stderr, " > shutdown step: [%d/%d] - {at %s} finished '%s' in %s\n", + fprintf(stdout, " > shutdown step: [%d/%d] - {at %s} finished '%s' in %s\n", (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, watcher_steps[step_id].msg, step_duration_txt); #endif @@ -81,7 +81,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo watcher_steps[step_id].msg, step_duration_txt); #if defined(FSANITIZE_ADDRESS) - fprintf(stderr, "shutdown step: [%d/%d] - {at %s} timeout '%s' takes too long (%s) - giving up...\n", + fprintf(stdout, "shutdown step: [%d/%d] - {at %s} timeout '%s' takes too long (%s) - giving up...\n", (int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt, watcher_steps[step_id].msg, step_duration_txt); #endif diff --git a/src/daemon/daemon-shutdown.c b/src/daemon/daemon-shutdown.c index 1d93069c33..db6c73bf62 100644 --- a/src/daemon/daemon-shutdown.c +++ b/src/daemon/daemon-shutdown.c @@ -330,8 +330,10 @@ void netdata_cleanup_and_exit(EXIT_REASON reason, const char *action, const char rrdhost_free_all(); fprintf(stderr, "Cleaning up destroyed dictionaries...\n"); - if(cleanup_destroyed_dictionaries()) - fprintf(stderr, "WARNING: There are still dictionaries with references in them, that cannot be destroyed.\n"); + size_t dictionaries_referenced = cleanup_destroyed_dictionaries(); + if(dictionaries_referenced) + fprintf(stderr, "WARNING: There are %zu dictionaries with references in them, that cannot be destroyed.\n", + dictionaries_referenced); // destroy the caches in reverse order (extent and open depend on main cache) fprintf(stderr, "Destroying extent cache (PGC)...\n"); diff --git a/src/daemon/main.c b/src/daemon/main.c index a547fc7f02..dbe6b03887 100644 --- a/src/daemon/main.c +++ b/src/daemon/main.c @@ -185,7 +185,7 @@ int help(int exitcode) { */ #if defined(FSANITIZE_ADDRESS) -#define LOG_TO_STDERR(...) fprintf(stderr, __VA_ARGS__) +#define LOG_TO_STDERR(...) fprintf(stdout, __VA_ARGS__) #else #define LOG_TO_STDERR(...) #endif diff --git a/src/daemon/service.c b/src/daemon/service.c index e82c6edc07..4d0c7f0fa8 100644 --- a/src/daemon/service.c +++ b/src/daemon/service.c @@ -152,6 +152,8 @@ static inline void svc_rrdhost_cleanup_charts_marked_obsolete(RRDHOST *host) { } rrdset_foreach_done(st); + dictionary_garbage_collect(host->rrdset_root_index); + if(partial_archives != partial_candidates) rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS); diff --git a/src/database/ram/rrddim_mem.c b/src/database/ram/rrddim_mem.c index f53b6efea4..f8c76da503 100644 --- a/src/database/ram/rrddim_mem.c +++ b/src/database/ram/rrddim_mem.c @@ -70,8 +70,13 @@ STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE netdata_rwlock_wrunlock(&rrddim_Judy_rwlock); } - if(unlikely(mh->rd != rd)) - fatal("DB_RAM_ALLOC: incorrect pointer returned from index."); + if(unlikely(mh->rd != rd)) { + // this can happen when the old RRDDIM is being deleted, + // but the dictionary has not yet run the destructors + netdata_rwlock_wrlock(&rrddim_Judy_rwlock); + mh->rd = rd; + netdata_rwlock_wrunlock(&rrddim_Judy_rwlock); + } return (STORAGE_METRIC_HANDLE *)mh; } @@ -112,7 +117,7 @@ STORAGE_METRIC_HANDLE *rrddim_metric_dup(STORAGE_METRIC_HANDLE *smh) { return smh; } -void rrddim_metric_release(STORAGE_METRIC_HANDLE *smh __maybe_unused) { +void rrddim_metric_release(STORAGE_METRIC_HANDLE *smh) { struct mem_metric_handle *mh = (struct mem_metric_handle *)smh; if(refcount_release_and_acquire_for_deletion(&mh->refcount)) { diff --git a/src/database/rrdset-index-name.c b/src/database/rrdset-index-name.c index 34c5f1cadd..e13568b962 100644 --- a/src/database/rrdset-index-name.c +++ b/src/database/rrdset-index-name.c @@ -80,8 +80,14 @@ void rrdset_index_add_name(RRDHOST *host, RRDSET *st) { const DICTIONARY_ITEM *sta = dictionary_get_and_acquire_item(host->rrdset_root_index, rrdset_id(st)); if(sta) { const DICTIONARY_ITEM *sta2 = dictionary_view_set_and_acquire_item(host->rrdset_root_index_name, rrdset_name(st), sta); - if(sta2 && dictionary_acquired_item_value(sta2) == st) - rrdset_flag_set(st, RRDSET_FLAG_INDEXED_NAME); + if(sta2) { + if(dictionary_acquired_item_value(sta2) == st) + rrdset_flag_set(st, RRDSET_FLAG_INDEXED_NAME); + + dictionary_acquired_item_release(host->rrdset_root_index_name, sta2); + } + + dictionary_acquired_item_release(host->rrdset_root_index, sta); } } @@ -89,8 +95,13 @@ void rrdset_index_del_name(RRDHOST *host, RRDSET *st) { if(rrdset_flag_check(st, RRDSET_FLAG_INDEXED_NAME)) { const DICTIONARY_ITEM *sta = dictionary_get_and_acquire_item(host->rrdset_root_index_name, rrdset_name(st)); - if(sta && dictionary_acquired_item_value(sta) == st) - dictionary_del(host->rrdset_root_index_name, rrdset_name(st)); + if(sta) { + if(dictionary_acquired_item_value(sta) == st) + dictionary_del(host->rrdset_root_index_name, rrdset_name(st)); + + dictionary_acquired_item_release(host->rrdset_root_index_name, sta); + dictionary_garbage_collect(host->rrdset_root_index_name); + } rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_NAME); } diff --git a/src/libnetdata/dictionary/dictionary.c b/src/libnetdata/dictionary/dictionary.c index 253dfa6324..5bf91991c9 100644 --- a/src/libnetdata/dictionary/dictionary.c +++ b/src/libnetdata/dictionary/dictionary.c @@ -330,13 +330,15 @@ static void dictionary_queue_for_destruction(DICTIONARY *dict) { netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); } -bool cleanup_destroyed_dictionaries(void) { +size_t cleanup_destroyed_dictionaries(void) { netdata_mutex_lock(&dictionaries_waiting_to_be_destroyed_mutex); if (!dictionaries_waiting_to_be_destroyed) { netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); - return false; + return 0; } + size_t remaining = 0; + DICTIONARY *dict, *last = NULL, *next = NULL; for(dict = dictionaries_waiting_to_be_destroyed; dict ; dict = next) { next = dict->next; @@ -353,7 +355,7 @@ bool cleanup_destroyed_dictionaries(void) { internal_error( true, - "DICTIONARY: freed dictionary with delayed destruction, created from %s() %zu@%s pid %d.", + "DICTIONARY DELAYED: freed dict created from %s() %zu@%s pid %d.", function, line, file, pid); if(last) last->next = next; @@ -363,18 +365,19 @@ bool cleanup_destroyed_dictionaries(void) { internal_error( true, - "DICTIONARY: cannot free dictionary with delayed destruction, created from %s() %zu@%s pid %d.", + "DICTIONARY DELAYED %zu: %zu referenced in dict created from %s() %zu@%s pid %d.", + remaining + 1, dictionary_referenced_items(dict), function, line, file, pid); DICTIONARY_STATS_DICT_DESTROY_QUEUED_PLUS1(dict); last = dict; + remaining++; } } - bool ret = dictionaries_waiting_to_be_destroyed != NULL; netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); - return ret; + return remaining; } // ---------------------------------------------------------------------------- diff --git a/src/libnetdata/dictionary/dictionary.h b/src/libnetdata/dictionary/dictionary.h index 93c95e4a80..6d73159a3b 100644 --- a/src/libnetdata/dictionary/dictionary.h +++ b/src/libnetdata/dictionary/dictionary.h @@ -168,7 +168,7 @@ void dictionary_version_increment(DICTIONARY *dict); void dictionary_garbage_collect(DICTIONARY *dict); -bool cleanup_destroyed_dictionaries(void); +size_t cleanup_destroyed_dictionaries(void); // ---------------------------------------------------------------------------- // Set an item in the dictionary diff --git a/src/libnetdata/uuid/uuidmap.c b/src/libnetdata/uuid/uuidmap.c index f9231537a4..33e8bd867a 100644 --- a/src/libnetdata/uuid/uuidmap.c +++ b/src/libnetdata/uuid/uuidmap.c @@ -2,8 +2,6 @@ #include "uuidmap.h" -#define UUIDMAP_REUSE_GAP 1000 - struct uuidmap_entry { nd_uuid_t uuid; REFCOUNT refcount; @@ -12,9 +10,7 @@ struct uuidmap_entry { struct uuidmap_partition { Pvoid_t uuid_to_id; // JudyL: UUID string -> ID Pvoid_t id_to_uuid; // JudyL: ID -> UUID binary - Pvoid_t freed_ids; // JudyL: the freed IDs UUIDMAP_ID next_id; // Only use lower bits - UUIDMAP_ID next_free_id; // fifo id when reusing freed ids RW_SPINLOCK spinlock; int64_t memory; @@ -64,33 +60,13 @@ static void uuidmap_init_aral(void) { } static UUIDMAP_ID get_next_id_unsafe(struct uuidmap_partition *partition) { - UUIDMAP_ID id = 0; + // Check if we've reached the maximum ID value + if (partition->next_id >= 0x1FFFFFFF) + fatal("UUIDMAP: Maximum ID limit reached for partition %u. UUIDs exhausted.", + (unsigned int)(partition - uuid_map.p)); - // Try to get a freed ID first if we have enough gap - Pvoid_t *PValue; - Word_t Index = 0; - - PValue = JudyLFirst(partition->freed_ids, &Index, PJE0); - if (PValue && PValue != PJERR && *PValue) { - // Check if the stored ID is old enough to be reused - UUIDMAP_ID stored_id = *(UUIDMAP_ID *)PValue; - UUIDMAP_ID current_next_id = uuidmap_make_id(partition - uuid_map.p, partition->next_id); - - if ((current_next_id - stored_id) >= UUIDMAP_REUSE_GAP) { - id = stored_id; - // Remove this entry from freed_ids since we're reusing it - int rc = JudyLDel(&partition->freed_ids, Index, PJE0); - if (unlikely(!rc)) - fatal("UUIDMAP: cannot delete ID from freed_ids JudyL"); - } - } - - if (id == 0) { - // No reusable IDs available, get next sequential ID - id = uuidmap_make_id(partition - uuid_map.p, ++partition->next_id); - } - - return id; + // Simply increment and return the next ID + return uuidmap_make_id(partition - uuid_map.p, ++partition->next_id); } static inline UUIDMAP_ID uuidmap_acquire_by_uuid(const nd_uuid_t uuid) { @@ -220,13 +196,6 @@ void uuidmap_free(UUIDMAP_ID id) { if(unlikely(!rc)) fatal("UUIDMAP: cannot delete ID from JudyL"); - // Add the freed ID to the freed_ids JudyL using next_free_id as index - Pvoid_t *PValue = JudyLIns(&uuid_map.p[partition].freed_ids, ++uuid_map.p[partition].next_free_id, PJE0); - if (!PValue || PValue == PJERR) - fatal("UUIDMAP: corrupted freed_ids JudyL array"); - - *(UUIDMAP_ID *)PValue = id; // Store the actual METRIC_ID as the value - uuid_map.p[partition].memory -= sizeof(*ue); uuid_map.p[partition].entries--; @@ -288,7 +257,6 @@ size_t uuidmap_destroy(void) { Pvoid_t uuid_to_id = uuid_map.p[partition].uuid_to_id; Pvoid_t id_to_uuid = uuid_map.p[partition].id_to_uuid; - Pvoid_t freed_ids = uuid_map.p[partition].freed_ids; // Process all entries in the id_to_uuid map Word_t id_index = 0; @@ -313,7 +281,6 @@ size_t uuidmap_destroy(void) { // Free all Judy arrays JudyHSFreeArray(&uuid_to_id, PJE0); JudyLFreeArray(&id_to_uuid, PJE0); - JudyLFreeArray(&freed_ids, PJE0); // Reset partition data memset(&uuid_map.p[partition], 0, sizeof(uuid_map.p[partition])); @@ -617,7 +584,7 @@ int uuidmap_unittest(void) { double ops = (double)successful / secs; fprintf(stderr, "uuidmap_uuid_ptr() : %.2f ops/sec (%.2f usec/op)\n", - ops, (double)(end_ut - start_ut) / successful); + ops, (double)(end_ut - start_ut) / (double)successful); // Second benchmark: uuidmap_get_by_uuid() successful = 0; @@ -636,7 +603,7 @@ int uuidmap_unittest(void) { ops = (double)successful / secs; fprintf(stderr, "uuidmap_acquire_by_uuid(): %.2f ops/sec (%.2f usec/op)\n", - ops, (double)(end_ut - start_ut) / successful); + ops, (double)(end_ut - start_ut) / (double)successful); } // Phase 2: Delete everything diff --git a/src/streaming/stream-connector.c b/src/streaming/stream-connector.c index f672f78a9a..e88c83a46d 100644 --- a/src/streaming/stream-connector.c +++ b/src/streaming/stream-connector.c @@ -629,6 +629,41 @@ static void *stream_connector_thread(void *ptr) { return NULL; } +void stream_connector_remove_host(RRDHOST *host) { + if(!host || !host->sender) return; + + struct connector *sc = stream_connector_get(host->sender); + + spinlock_lock(&sc->queue.spinlock); + Word_t idx = 0; + for(struct sender_state *s = SENDERS_FIRST(&sc->queue.senders, &idx); + s; + s = SENDERS_NEXT(&sc->queue.senders, &idx)) { + + if(s != host->sender) + continue; + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname), + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + SENDERS_DEL(&sc->queue.senders, idx); + spinlock_unlock(&sc->queue.spinlock); + + // do not have the connector lock when calling these + stream_sender_on_disconnect(s); + stream_sender_remove(s, s->exit.reason); + + spinlock_lock(&sc->queue.spinlock); + break; + } + + spinlock_unlock(&sc->queue.spinlock); +} + bool stream_connector_init(struct sender_state *s) { static SPINLOCK spinlock = SPINLOCK_INITIALIZER; if(!s) return false; diff --git a/src/streaming/stream-receiver.c b/src/streaming/stream-receiver.c index 6f8f7633d5..d9be48fa99 100644 --- a/src/streaming/stream-receiver.c +++ b/src/streaming/stream-receiver.c @@ -441,23 +441,6 @@ void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct re "Failed to add receiver socket to nd_poll()", sth->id, rrdhost_hostname(rpt->host), rpt->remote_ip, rpt->remote_port); - // put the client IP and port into the buffers used by plugins.d - { - char buf[CONFIG_MAX_NAME]; - snprintfz(buf, sizeof(buf), "[%s]:%s", rpt->remote_ip, rpt->remote_port); - string_freez(rpt->thread.cd.id); - rpt->thread.cd.id = string_strdupz(buf); - - string_freez(rpt->thread.cd.filename); - rpt->thread.cd.filename = NULL; - - string_freez(rpt->thread.cd.fullfilename); - rpt->thread.cd.fullfilename = NULL; - - string_freez(rpt->thread.cd.cmd); - rpt->thread.cd.cmd = NULL; - } - rpt->thread.compressed.start = 0; rpt->thread.compressed.used = 0; rpt->thread.compressed.enabled = stream_decompression_initialize(rpt); @@ -472,15 +455,25 @@ void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct re PARSER *parser = NULL; { - rpt->thread.cd = (struct plugind){ - .update_every = nd_profile.update_every, - .unsafe = { - .spinlock = SPINLOCK_INITIALIZER, - .running = true, - .enabled = true, - }, - .started_t = now_realtime_sec(), - }; + char buf[CONFIG_MAX_NAME]; + snprintfz(buf, sizeof(buf), "[%s]:%s", rpt->remote_ip, rpt->remote_port); + string_freez(rpt->thread.cd.id); + rpt->thread.cd.id = string_strdupz(buf); + + string_freez(rpt->thread.cd.filename); + rpt->thread.cd.filename = NULL; + + string_freez(rpt->thread.cd.fullfilename); + rpt->thread.cd.fullfilename = NULL; + + string_freez(rpt->thread.cd.cmd); + rpt->thread.cd.cmd = NULL; + + rpt->thread.cd.update_every = (int)nd_profile.update_every; + spinlock_init(&rpt->thread.cd.unsafe.spinlock); + rpt->thread.cd.unsafe.running = true; + rpt->thread.cd.unsafe.enabled = true; + rpt->thread.cd.started_t = now_realtime_sec(); PARSER_USER_OBJECT user = { .enabled = plugin_is_enabled(&rpt->thread.cd), diff --git a/src/streaming/stream-sender-api.c b/src/streaming/stream-sender-api.c index d43a582cf1..141270eb19 100644 --- a/src/streaming/stream-sender-api.c +++ b/src/streaming/stream-sender-api.c @@ -134,6 +134,8 @@ void stream_sender_signal_to_stop_and_wait(struct rrdhost *host, STREAM_HANDSHAK stream_sender_send_opcode(host->sender, msg); - while(wait && rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_SENDER_ADDED)) + while(wait && rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_SENDER_ADDED)) { sleep_usec(10 * USEC_PER_MS); + stream_connector_remove_host(host); + } } diff --git a/src/streaming/stream.h b/src/streaming/stream.h index 67bc7e41fb..ad1d68d8a6 100644 --- a/src/streaming/stream.h +++ b/src/streaming/stream.h @@ -19,6 +19,7 @@ struct receiver_state; void *stream_sender_start_localhost(void *ptr); void stream_sender_start_host(struct rrdhost *host); void stream_sender_signal_to_stop_and_wait(struct rrdhost *host, STREAM_HANDSHAKE reason, bool wait); +void stream_connector_remove_host(RRDHOST *host); // managing host sender structures void stream_sender_structures_init(struct rrdhost *host, bool stream, STRING *parents, STRING *api_key, STRING *send_charts_matching);