0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-27 14:16:20 +00:00

Fix RRDDIM_MEM storage engine index ()

* add more logging to rrddim fatal incorrect pointer returned from index

* increase status file version to 13

* remove reuse of old uuidmap ids

* debug info to find leaked refcount

* fix strings leak in streaming; count delayed dictionaries

* switch rd pointer when rrddim_mem finds the same uuidmap id on a different rrddim

* fix the rrdset-name cleanup

* cleanup
This commit is contained in:
Costa Tsaousis 2025-03-12 21:15:47 +00:00 committed by GitHub
parent a6cc2215f4
commit ffe402f023
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 109 additions and 88 deletions

View file

@ -35,7 +35,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo
watcher_steps[step_id].msg);
#if defined(FSANITIZE_ADDRESS)
fprintf(stderr, " > shutdown step: [%d/%d] - {at %s} started '%s'...\n",
fprintf(stdout, " > shutdown step: [%d/%d] - {at %s} started '%s'...\n",
(int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt,
watcher_steps[step_id].msg);
#endif
@ -69,7 +69,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo
watcher_steps[step_id].msg, step_duration_txt);
#if defined(FSANITIZE_ADDRESS)
fprintf(stderr, " > shutdown step: [%d/%d] - {at %s} finished '%s' in %s\n",
fprintf(stdout, " > shutdown step: [%d/%d] - {at %s} finished '%s' in %s\n",
(int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt,
watcher_steps[step_id].msg, step_duration_txt);
#endif
@ -81,7 +81,7 @@ static void watcher_wait_for_step(const watcher_step_id_t step_id, usec_t shutdo
watcher_steps[step_id].msg, step_duration_txt);
#if defined(FSANITIZE_ADDRESS)
fprintf(stderr, "shutdown step: [%d/%d] - {at %s} timeout '%s' takes too long (%s) - giving up...\n",
fprintf(stdout, "shutdown step: [%d/%d] - {at %s} timeout '%s' takes too long (%s) - giving up...\n",
(int)step_id + 1, (int)WATCHER_STEP_ID_MAX, start_duration_txt,
watcher_steps[step_id].msg, step_duration_txt);
#endif

View file

@ -330,8 +330,10 @@ void netdata_cleanup_and_exit(EXIT_REASON reason, const char *action, const char
rrdhost_free_all();
fprintf(stderr, "Cleaning up destroyed dictionaries...\n");
if(cleanup_destroyed_dictionaries())
fprintf(stderr, "WARNING: There are still dictionaries with references in them, that cannot be destroyed.\n");
size_t dictionaries_referenced = cleanup_destroyed_dictionaries();
if(dictionaries_referenced)
fprintf(stderr, "WARNING: There are %zu dictionaries with references in them, that cannot be destroyed.\n",
dictionaries_referenced);
// destroy the caches in reverse order (extent and open depend on main cache)
fprintf(stderr, "Destroying extent cache (PGC)...\n");

View file

@ -185,7 +185,7 @@ int help(int exitcode) {
*/
#if defined(FSANITIZE_ADDRESS)
#define LOG_TO_STDERR(...) fprintf(stderr, __VA_ARGS__)
#define LOG_TO_STDERR(...) fprintf(stdout, __VA_ARGS__)
#else
#define LOG_TO_STDERR(...)
#endif

View file

@ -152,6 +152,8 @@ static inline void svc_rrdhost_cleanup_charts_marked_obsolete(RRDHOST *host) {
}
rrdset_foreach_done(st);
dictionary_garbage_collect(host->rrdset_root_index);
if(partial_archives != partial_candidates)
rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS);

View file

@ -70,8 +70,13 @@ STORAGE_METRIC_HANDLE *rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
netdata_rwlock_wrunlock(&rrddim_Judy_rwlock);
}
if(unlikely(mh->rd != rd))
fatal("DB_RAM_ALLOC: incorrect pointer returned from index.");
if(unlikely(mh->rd != rd)) {
// this can happen when the old RRDDIM is being deleted,
// but the dictionary has not yet run the destructors
netdata_rwlock_wrlock(&rrddim_Judy_rwlock);
mh->rd = rd;
netdata_rwlock_wrunlock(&rrddim_Judy_rwlock);
}
return (STORAGE_METRIC_HANDLE *)mh;
}
@ -112,7 +117,7 @@ STORAGE_METRIC_HANDLE *rrddim_metric_dup(STORAGE_METRIC_HANDLE *smh) {
return smh;
}
void rrddim_metric_release(STORAGE_METRIC_HANDLE *smh __maybe_unused) {
void rrddim_metric_release(STORAGE_METRIC_HANDLE *smh) {
struct mem_metric_handle *mh = (struct mem_metric_handle *)smh;
if(refcount_release_and_acquire_for_deletion(&mh->refcount)) {

View file

@ -80,8 +80,14 @@ void rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
const DICTIONARY_ITEM *sta = dictionary_get_and_acquire_item(host->rrdset_root_index, rrdset_id(st));
if(sta) {
const DICTIONARY_ITEM *sta2 = dictionary_view_set_and_acquire_item(host->rrdset_root_index_name, rrdset_name(st), sta);
if(sta2 && dictionary_acquired_item_value(sta2) == st)
rrdset_flag_set(st, RRDSET_FLAG_INDEXED_NAME);
if(sta2) {
if(dictionary_acquired_item_value(sta2) == st)
rrdset_flag_set(st, RRDSET_FLAG_INDEXED_NAME);
dictionary_acquired_item_release(host->rrdset_root_index_name, sta2);
}
dictionary_acquired_item_release(host->rrdset_root_index, sta);
}
}
@ -89,8 +95,13 @@ void rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
if(rrdset_flag_check(st, RRDSET_FLAG_INDEXED_NAME)) {
const DICTIONARY_ITEM *sta = dictionary_get_and_acquire_item(host->rrdset_root_index_name, rrdset_name(st));
if(sta && dictionary_acquired_item_value(sta) == st)
dictionary_del(host->rrdset_root_index_name, rrdset_name(st));
if(sta) {
if(dictionary_acquired_item_value(sta) == st)
dictionary_del(host->rrdset_root_index_name, rrdset_name(st));
dictionary_acquired_item_release(host->rrdset_root_index_name, sta);
dictionary_garbage_collect(host->rrdset_root_index_name);
}
rrdset_flag_clear(st, RRDSET_FLAG_INDEXED_NAME);
}

View file

@ -330,13 +330,15 @@ static void dictionary_queue_for_destruction(DICTIONARY *dict) {
netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex);
}
bool cleanup_destroyed_dictionaries(void) {
size_t cleanup_destroyed_dictionaries(void) {
netdata_mutex_lock(&dictionaries_waiting_to_be_destroyed_mutex);
if (!dictionaries_waiting_to_be_destroyed) {
netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex);
return false;
return 0;
}
size_t remaining = 0;
DICTIONARY *dict, *last = NULL, *next = NULL;
for(dict = dictionaries_waiting_to_be_destroyed; dict ; dict = next) {
next = dict->next;
@ -353,7 +355,7 @@ bool cleanup_destroyed_dictionaries(void) {
internal_error(
true,
"DICTIONARY: freed dictionary with delayed destruction, created from %s() %zu@%s pid %d.",
"DICTIONARY DELAYED: freed dict created from %s() %zu@%s pid %d.",
function, line, file, pid);
if(last) last->next = next;
@ -363,18 +365,19 @@ bool cleanup_destroyed_dictionaries(void) {
internal_error(
true,
"DICTIONARY: cannot free dictionary with delayed destruction, created from %s() %zu@%s pid %d.",
"DICTIONARY DELAYED %zu: %zu referenced in dict created from %s() %zu@%s pid %d.",
remaining + 1, dictionary_referenced_items(dict),
function, line, file, pid);
DICTIONARY_STATS_DICT_DESTROY_QUEUED_PLUS1(dict);
last = dict;
remaining++;
}
}
bool ret = dictionaries_waiting_to_be_destroyed != NULL;
netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex);
return ret;
return remaining;
}
// ----------------------------------------------------------------------------

View file

@ -168,7 +168,7 @@ void dictionary_version_increment(DICTIONARY *dict);
void dictionary_garbage_collect(DICTIONARY *dict);
bool cleanup_destroyed_dictionaries(void);
size_t cleanup_destroyed_dictionaries(void);
// ----------------------------------------------------------------------------
// Set an item in the dictionary

View file

@ -2,8 +2,6 @@
#include "uuidmap.h"
#define UUIDMAP_REUSE_GAP 1000
struct uuidmap_entry {
nd_uuid_t uuid;
REFCOUNT refcount;
@ -12,9 +10,7 @@ struct uuidmap_entry {
struct uuidmap_partition {
Pvoid_t uuid_to_id; // JudyL: UUID string -> ID
Pvoid_t id_to_uuid; // JudyL: ID -> UUID binary
Pvoid_t freed_ids; // JudyL: the freed IDs
UUIDMAP_ID next_id; // Only use lower bits
UUIDMAP_ID next_free_id; // fifo id when reusing freed ids
RW_SPINLOCK spinlock;
int64_t memory;
@ -64,33 +60,13 @@ static void uuidmap_init_aral(void) {
}
static UUIDMAP_ID get_next_id_unsafe(struct uuidmap_partition *partition) {
UUIDMAP_ID id = 0;
// Check if we've reached the maximum ID value
if (partition->next_id >= 0x1FFFFFFF)
fatal("UUIDMAP: Maximum ID limit reached for partition %u. UUIDs exhausted.",
(unsigned int)(partition - uuid_map.p));
// Try to get a freed ID first if we have enough gap
Pvoid_t *PValue;
Word_t Index = 0;
PValue = JudyLFirst(partition->freed_ids, &Index, PJE0);
if (PValue && PValue != PJERR && *PValue) {
// Check if the stored ID is old enough to be reused
UUIDMAP_ID stored_id = *(UUIDMAP_ID *)PValue;
UUIDMAP_ID current_next_id = uuidmap_make_id(partition - uuid_map.p, partition->next_id);
if ((current_next_id - stored_id) >= UUIDMAP_REUSE_GAP) {
id = stored_id;
// Remove this entry from freed_ids since we're reusing it
int rc = JudyLDel(&partition->freed_ids, Index, PJE0);
if (unlikely(!rc))
fatal("UUIDMAP: cannot delete ID from freed_ids JudyL");
}
}
if (id == 0) {
// No reusable IDs available, get next sequential ID
id = uuidmap_make_id(partition - uuid_map.p, ++partition->next_id);
}
return id;
// Simply increment and return the next ID
return uuidmap_make_id(partition - uuid_map.p, ++partition->next_id);
}
static inline UUIDMAP_ID uuidmap_acquire_by_uuid(const nd_uuid_t uuid) {
@ -220,13 +196,6 @@ void uuidmap_free(UUIDMAP_ID id) {
if(unlikely(!rc))
fatal("UUIDMAP: cannot delete ID from JudyL");
// Add the freed ID to the freed_ids JudyL using next_free_id as index
Pvoid_t *PValue = JudyLIns(&uuid_map.p[partition].freed_ids, ++uuid_map.p[partition].next_free_id, PJE0);
if (!PValue || PValue == PJERR)
fatal("UUIDMAP: corrupted freed_ids JudyL array");
*(UUIDMAP_ID *)PValue = id; // Store the actual METRIC_ID as the value
uuid_map.p[partition].memory -= sizeof(*ue);
uuid_map.p[partition].entries--;
@ -288,7 +257,6 @@ size_t uuidmap_destroy(void) {
Pvoid_t uuid_to_id = uuid_map.p[partition].uuid_to_id;
Pvoid_t id_to_uuid = uuid_map.p[partition].id_to_uuid;
Pvoid_t freed_ids = uuid_map.p[partition].freed_ids;
// Process all entries in the id_to_uuid map
Word_t id_index = 0;
@ -313,7 +281,6 @@ size_t uuidmap_destroy(void) {
// Free all Judy arrays
JudyHSFreeArray(&uuid_to_id, PJE0);
JudyLFreeArray(&id_to_uuid, PJE0);
JudyLFreeArray(&freed_ids, PJE0);
// Reset partition data
memset(&uuid_map.p[partition], 0, sizeof(uuid_map.p[partition]));
@ -617,7 +584,7 @@ int uuidmap_unittest(void) {
double ops = (double)successful / secs;
fprintf(stderr, "uuidmap_uuid_ptr() : %.2f ops/sec (%.2f usec/op)\n",
ops, (double)(end_ut - start_ut) / successful);
ops, (double)(end_ut - start_ut) / (double)successful);
// Second benchmark: uuidmap_get_by_uuid()
successful = 0;
@ -636,7 +603,7 @@ int uuidmap_unittest(void) {
ops = (double)successful / secs;
fprintf(stderr, "uuidmap_acquire_by_uuid(): %.2f ops/sec (%.2f usec/op)\n",
ops, (double)(end_ut - start_ut) / successful);
ops, (double)(end_ut - start_ut) / (double)successful);
}
// Phase 2: Delete everything

View file

@ -629,6 +629,41 @@ static void *stream_connector_thread(void *ptr) {
return NULL;
}
void stream_connector_remove_host(RRDHOST *host) {
if(!host || !host->sender) return;
struct connector *sc = stream_connector_get(host->sender);
spinlock_lock(&sc->queue.spinlock);
Word_t idx = 0;
for(struct sender_state *s = SENDERS_FIRST(&sc->queue.senders, &idx);
s;
s = SENDERS_NEXT(&sc->queue.senders, &idx)) {
if(s != host->sender)
continue;
ND_LOG_STACK lgs[] = {
ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname),
ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid),
ND_LOG_FIELD_END(),
};
ND_LOG_STACK_PUSH(lgs);
SENDERS_DEL(&sc->queue.senders, idx);
spinlock_unlock(&sc->queue.spinlock);
// do not have the connector lock when calling these
stream_sender_on_disconnect(s);
stream_sender_remove(s, s->exit.reason);
spinlock_lock(&sc->queue.spinlock);
break;
}
spinlock_unlock(&sc->queue.spinlock);
}
bool stream_connector_init(struct sender_state *s) {
static SPINLOCK spinlock = SPINLOCK_INITIALIZER;
if(!s) return false;

View file

@ -441,23 +441,6 @@ void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct re
"Failed to add receiver socket to nd_poll()",
sth->id, rrdhost_hostname(rpt->host), rpt->remote_ip, rpt->remote_port);
// put the client IP and port into the buffers used by plugins.d
{
char buf[CONFIG_MAX_NAME];
snprintfz(buf, sizeof(buf), "[%s]:%s", rpt->remote_ip, rpt->remote_port);
string_freez(rpt->thread.cd.id);
rpt->thread.cd.id = string_strdupz(buf);
string_freez(rpt->thread.cd.filename);
rpt->thread.cd.filename = NULL;
string_freez(rpt->thread.cd.fullfilename);
rpt->thread.cd.fullfilename = NULL;
string_freez(rpt->thread.cd.cmd);
rpt->thread.cd.cmd = NULL;
}
rpt->thread.compressed.start = 0;
rpt->thread.compressed.used = 0;
rpt->thread.compressed.enabled = stream_decompression_initialize(rpt);
@ -472,15 +455,25 @@ void stream_receiver_move_to_running_unsafe(struct stream_thread *sth, struct re
PARSER *parser = NULL;
{
rpt->thread.cd = (struct plugind){
.update_every = nd_profile.update_every,
.unsafe = {
.spinlock = SPINLOCK_INITIALIZER,
.running = true,
.enabled = true,
},
.started_t = now_realtime_sec(),
};
char buf[CONFIG_MAX_NAME];
snprintfz(buf, sizeof(buf), "[%s]:%s", rpt->remote_ip, rpt->remote_port);
string_freez(rpt->thread.cd.id);
rpt->thread.cd.id = string_strdupz(buf);
string_freez(rpt->thread.cd.filename);
rpt->thread.cd.filename = NULL;
string_freez(rpt->thread.cd.fullfilename);
rpt->thread.cd.fullfilename = NULL;
string_freez(rpt->thread.cd.cmd);
rpt->thread.cd.cmd = NULL;
rpt->thread.cd.update_every = (int)nd_profile.update_every;
spinlock_init(&rpt->thread.cd.unsafe.spinlock);
rpt->thread.cd.unsafe.running = true;
rpt->thread.cd.unsafe.enabled = true;
rpt->thread.cd.started_t = now_realtime_sec();
PARSER_USER_OBJECT user = {
.enabled = plugin_is_enabled(&rpt->thread.cd),

View file

@ -134,6 +134,8 @@ void stream_sender_signal_to_stop_and_wait(struct rrdhost *host, STREAM_HANDSHAK
stream_sender_send_opcode(host->sender, msg);
while(wait && rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_SENDER_ADDED))
while(wait && rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_SENDER_ADDED)) {
sleep_usec(10 * USEC_PER_MS);
stream_connector_remove_host(host);
}
}

View file

@ -19,6 +19,7 @@ struct receiver_state;
void *stream_sender_start_localhost(void *ptr);
void stream_sender_start_host(struct rrdhost *host);
void stream_sender_signal_to_stop_and_wait(struct rrdhost *host, STREAM_HANDSHAKE reason, bool wait);
void stream_connector_remove_host(RRDHOST *host);
// managing host sender structures
void stream_sender_structures_init(struct rrdhost *host, bool stream, STRING *parents, STRING *api_key, STRING *send_charts_matching);