diff --git a/src/daemon/daemon-shutdown.c b/src/daemon/daemon-shutdown.c index 29fa7f1a81..1fbb0f002a 100644 --- a/src/daemon/daemon-shutdown.c +++ b/src/daemon/daemon-shutdown.c @@ -220,7 +220,7 @@ void netdata_cleanup_and_exit(EXIT_REASON reason, const char *action, const char #ifdef ENABLE_DBENGINE if(!ret && dbengine_enabled) // flush all dirty pages now that all collectors and streaming completed - rrdeng_flush_everything_and_wait(false, false, false); + rrdeng_flush_everything_and_wait(false, false, true); #endif service_wait_exit(SERVICE_REPLICATION, 3 * USEC_PER_SEC); @@ -322,6 +322,43 @@ void netdata_cleanup_and_exit(EXIT_REASON reason, const char *action, const char daemon_status_file_shutdown_step(NULL); daemon_status_file_update_status(DAEMON_STATUS_EXITED); +#if defined(FSANITIZE_ADDRESS) + fprintf(stderr, "\n"); + + fprintf(stderr, "Freeing all RRDHOSTs...\n"); + rrdhost_free_all(); + + fprintf(stderr, "Cleaning up destroyed dictionaries...\n"); + if(cleanup_destroyed_dictionaries()) + fprintf(stderr, "WARNING: There are still dictionaries with references in them, that cannot be destroyed.\n"); + + // destroy the caches in reverse order (extent and open depend on main cache) + fprintf(stderr, "Destroying extent cache (PGC)...\n"); + pgc_destroy(extent_cache); + fprintf(stderr, "Destroying open cache (PGC)...\n"); + pgc_destroy(open_cache); + fprintf(stderr, "Destroying main cache (PGC)...\n"); + pgc_destroy(main_cache); + + fprintf(stderr, "Destroying metrics registry (MRG)...\n"); + size_t metrics_referenced = mrg_destroy(main_mrg); + if(metrics_referenced) + fprintf(stderr, "WARNING: MRG had %zu metrics referenced.\n", + metrics_referenced); + + fprintf(stderr, "Destroying UUIDMap...\n"); + size_t uuid_referenced = uuidmap_destroy(); + if(uuid_referenced) + fprintf(stderr, "WARNING: UUIDMAP had %zu UUIDs referenced.\n", + uuid_referenced); + + // strings_destroy(); + // functions_destroy(); + // dyncfg_destroy(); + + fprintf(stderr, "All done, exiting...\n"); +#endif + #ifdef OS_WINDOWS curl_global_cleanup(); return; diff --git a/src/daemon/daemon-status-file.c b/src/daemon/daemon-status-file.c index 0d2548bba1..7ecd1bdc3f 100644 --- a/src/daemon/daemon-status-file.c +++ b/src/daemon/daemon-status-file.c @@ -9,7 +9,7 @@ #include <openssl/pem.h> #include <openssl/err.h> -#define STATUS_FILE_VERSION 12 +#define STATUS_FILE_VERSION 13 #define STATUS_FILENAME "status-netdata.json" diff --git a/src/daemon/pulse/pulse-aral.c b/src/daemon/pulse/pulse-aral.c index ff6ae15990..4050a016ee 100644 --- a/src/daemon/pulse/pulse-aral.c +++ b/src/daemon/pulse/pulse-aral.c @@ -33,6 +33,17 @@ void pulse_aral_register_statistics(struct aral_statistics *stats, const char *n spinlock_unlock(&globals.spinlock); } +void pulse_aral_unregister_statistics(struct aral_statistics *stats) { + spinlock_lock(&globals.spinlock); + struct aral_info *ai = ARAL_STATS_GET(&globals.idx, (Word_t)stats); + if(ai) { + ARAL_STATS_DEL(&globals.idx, (Word_t)stats); + freez((void *)ai->name); + freez(ai); + } + spinlock_unlock(&globals.spinlock); +} + void pulse_aral_register(ARAL *ar, const char *name) { if(!ar) return; @@ -47,15 +58,7 @@ void pulse_aral_register(ARAL *ar, const char *name) { void pulse_aral_unregister(ARAL *ar) { if(!ar) return; struct aral_statistics *stats = aral_get_statistics(ar); - - spinlock_lock(&globals.spinlock); - struct aral_info *ai = ARAL_STATS_GET(&globals.idx, (Word_t)stats); - if(ai) { - ARAL_STATS_DEL(&globals.idx, (Word_t)stats); - freez((void *)ai->name); - freez(ai); - } - spinlock_unlock(&globals.spinlock); + pulse_aral_unregister_statistics(stats); } void pulse_aral_init(void) { diff --git a/src/daemon/pulse/pulse-aral.h b/src/daemon/pulse/pulse-aral.h index 6425773607..70b47cb47f 100644 --- a/src/daemon/pulse/pulse-aral.h +++ b/src/daemon/pulse/pulse-aral.h @@ -6,6 +6,8 @@ #include "daemon/common.h" void pulse_aral_register_statistics(struct aral_statistics *stats, const char *name); +void pulse_aral_unregister_statistics(struct aral_statistics *stats); + void pulse_aral_register(ARAL *ar, const char *name); void pulse_aral_unregister(ARAL *ar); diff --git a/src/database/engine/metric.c b/src/database/engine/metric.c index c0765b2c8f..955d138571 100644 --- a/src/database/engine/metric.c +++ b/src/database/engine/metric.c @@ -357,14 +357,75 @@ struct aral_statistics *mrg_aral_stats(void) { return &mrg_aral_statistics; } -ALWAYS_INLINE void mrg_destroy(MRG *mrg __maybe_unused) { - // no destruction possible - // we can't traverse the metrics list +size_t mrg_destroy(MRG *mrg) { + if (unlikely(!mrg)) + return 0; - // to delete entries, the caller needs to keep pointers to them - // and delete them one by one + size_t referenced = 0; - pulse_aral_unregister(mrg->index[0].aral); + // Traverse all partitions + for (size_t partition = 0; partition < UUIDMAP_PARTITIONS; partition++) { + // Lock the partition to prevent new entries while we're cleaning up + mrg_index_write_lock(mrg, partition); + + Pvoid_t uuid_judy = mrg->index[partition].uuid_judy; + Word_t uuid_index = 0; + Pvoid_t *uuid_pvalue; + + // Traverse all UUIDs in this partition + for (uuid_pvalue = JudyLFirst(uuid_judy, &uuid_index, PJE0); + uuid_pvalue != NULL && uuid_pvalue != PJERR; + uuid_pvalue = JudyLNext(uuid_judy, &uuid_index, PJE0)) { + + if (!(*uuid_pvalue)) + continue; + + // Get the sections judy for this UUID + Pvoid_t sections_judy = *uuid_pvalue; + Word_t section_index = 0; + Pvoid_t *section_pvalue; + + // Traverse all sections for this UUID + for (section_pvalue = JudyLFirst(sections_judy, §ion_index, PJE0); + section_pvalue != NULL && section_pvalue != PJERR; + section_pvalue = JudyLNext(sections_judy, §ion_index, PJE0)) { + + if (!(*section_pvalue)) + continue; + + METRIC *metric = *section_pvalue; + + // Try to acquire metric for deletion + if (!refcount_acquire_for_deletion(&metric->refcount)) + referenced++; + + uuidmap_free(metric->uuid); + aral_freez(mrg->index[partition].aral, metric); + MRG_STATS_DELETED_METRIC(mrg, partition); + } + + JudyLFreeArray(§ions_judy, PJE0); + } + + JudyLFreeArray(&uuid_judy, PJE0); + + // Update the main Judy array reference + mrg->index[partition].uuid_judy = uuid_judy; + + // Unlock the partition + mrg_index_write_unlock(mrg, partition); + + // Destroy the aral for this partition + aral_destroy(mrg->index[partition].aral); + } + + // Unregister the aral statistics + pulse_aral_unregister_statistics(&mrg_aral_statistics); + + // Free the MRG structure + freez(mrg); + + return referenced; } ALWAYS_INLINE METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { diff --git a/src/database/engine/metric.h b/src/database/engine/metric.h index a1dfa6504d..af5935a9ae 100644 --- a/src/database/engine/metric.h +++ b/src/database/engine/metric.h @@ -41,7 +41,9 @@ struct mrg_statistics { }; MRG *mrg_create(void); -void mrg_destroy(MRG *mrg); + +// returns the number of metrics that were freed, but were still referenced +size_t mrg_destroy(MRG *mrg); METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric); void mrg_metric_release(MRG *mrg, METRIC *metric); diff --git a/src/database/rrdfunctions-inline.c b/src/database/rrdfunctions-inline.c index a8e2c2357b..617b73ee2d 100644 --- a/src/database/rrdfunctions-inline.c +++ b/src/database/rrdfunctions-inline.c @@ -2,22 +2,18 @@ #include "rrdfunctions-inline.h" -struct rrd_function_inline { - rrd_function_execute_inline_cb_t cb; -}; - static int rrd_function_run_inline(struct rrd_function_execute *rfe, void *data) { // IMPORTANT: this function MUST call the result_cb even on failures - struct rrd_function_inline *fi = data; + rrd_function_execute_inline_cb_t execute_cb = data; int code; if(rfe->is_cancelled.cb && rfe->is_cancelled.cb(rfe->is_cancelled.data)) code = HTTP_RESP_CLIENT_CLOSED_REQUEST; else - code = fi->cb(rfe->result.wb, rfe->function, rfe->payload, rfe->source); + code = execute_cb(rfe->result.wb, rfe->function, rfe->payload, rfe->source); if(code == HTTP_RESP_CLIENT_CLOSED_REQUEST || (rfe->is_cancelled.cb && rfe->is_cancelled.cb(rfe->is_cancelled.data))) { buffer_flush(rfe->result.wb); @@ -36,10 +32,7 @@ void rrd_function_add_inline(RRDHOST *host, RRDSET *st, const char *name, int ti rrd_collector_started(); // this creates a collector that runs for as long as netdata runs - struct rrd_function_inline *fi = callocz(1, sizeof(struct rrd_function_inline)); - fi->cb = execute_cb; - rrd_function_add(host, st, name, timeout, priority, version, help, tags, access, true, - rrd_function_run_inline, fi); + rrd_function_run_inline, execute_cb); } diff --git a/src/database/rrdhost.c b/src/database/rrdhost.c index 01342b94ad..112a51259d 100644 --- a/src/database/rrdhost.c +++ b/src/database/rrdhost.c @@ -132,12 +132,14 @@ static inline RRDHOST *rrdhost_index_add_hostname(RRDHOST *host) { if(ret_hostname == host) rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_HOSTNAME); else { - //have the same hostname but it's not the same host - //keep the new one only if the old one is orphan or archived + // have the same hostname, but it's not the same host + // keep the new one only if the old one is orphan or archived if (rrdhost_flag_check(ret_hostname, RRDHOST_FLAG_ORPHAN) || rrdhost_flag_check(ret_hostname, RRDHOST_FLAG_ARCHIVED)) { rrdhost_index_del_hostname(ret_hostname); rrdhost_index_add_hostname(host); } + else + rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME); } return host; @@ -838,6 +840,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host) { rrdhost_destroy_rrdcontexts(host); rrdlabels_destroy(host->rrdlabels); + destroy_aclk_config(host); string_freez(host->hostname); string_freez(host->os); @@ -863,5 +866,12 @@ void rrdhost_free_all(void) { if(localhost) rrdhost_free___while_having_rrd_wrlock(localhost); + localhost = NULL; + + dictionary_destroy(rrdhost_root_index_hostname); + dictionary_destroy(rrdhost_root_index); + rrdhost_root_index_hostname = NULL; + rrdhost_root_index = NULL; + rrd_wrunlock(); } diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 1eae4b1a62..c218fe2e88 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -909,6 +909,15 @@ void create_aclk_config(RRDHOST *host __maybe_unused, nd_uuid_t *host_uuid __may wc->node_info_send_time = (host == localhost || NULL == localhost) ? now - 25 : now; } +void destroy_aclk_config(RRDHOST *host) +{ + if (!host || !host->aclk_config) + return; + + freez(host->aclk_config); + host->aclk_config = NULL; +} + #define SQL_FETCH_ALL_HOSTS \ "SELECT host_id, hostname, registry_hostname, update_every, os, " \ "timezone, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ diff --git a/src/database/sqlite/sqlite_aclk.h b/src/database/sqlite/sqlite_aclk.h index eb72a6ce08..d0776e2077 100644 --- a/src/database/sqlite/sqlite_aclk.h +++ b/src/database/sqlite/sqlite_aclk.h @@ -50,10 +50,10 @@ typedef struct aclk_sync_cfg_t { time_t node_info_send_time; time_t node_collectors_send; char node_id[UUID_STR_LEN]; - char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested } aclk_sync_cfg_t; void create_aclk_config(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id); +void destroy_aclk_config(RRDHOST *host); void sql_aclk_sync_init(void); void aclk_push_alert_config(const char *node_id, const char *config_hash); void schedule_node_state_update(RRDHOST *host, uint64_t delay); diff --git a/src/libnetdata/dictionary/dictionary.c b/src/libnetdata/dictionary/dictionary.c index 41f584043f..569c81e05b 100644 --- a/src/libnetdata/dictionary/dictionary.c +++ b/src/libnetdata/dictionary/dictionary.c @@ -330,11 +330,11 @@ static void dictionary_queue_for_destruction(DICTIONARY *dict) { netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); } -void cleanup_destroyed_dictionaries(void) { +bool cleanup_destroyed_dictionaries(void) { netdata_mutex_lock(&dictionaries_waiting_to_be_destroyed_mutex); if (!dictionaries_waiting_to_be_destroyed) { netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); - return; + return false; } DICTIONARY *dict, *last = NULL, *next = NULL; @@ -371,7 +371,10 @@ void cleanup_destroyed_dictionaries(void) { } } + bool ret = dictionaries_waiting_to_be_destroyed != NULL; netdata_mutex_unlock(&dictionaries_waiting_to_be_destroyed_mutex); + + return ret; } // ---------------------------------------------------------------------------- diff --git a/src/libnetdata/dictionary/dictionary.h b/src/libnetdata/dictionary/dictionary.h index cc792cbe96..97d9a14d4c 100644 --- a/src/libnetdata/dictionary/dictionary.h +++ b/src/libnetdata/dictionary/dictionary.h @@ -168,7 +168,7 @@ void dictionary_version_increment(DICTIONARY *dict); void dictionary_garbage_collect(DICTIONARY *dict); -void cleanup_destroyed_dictionaries(void); +bool cleanup_destroyed_dictionaries(void); // ---------------------------------------------------------------------------- // Set an item in the dictionary diff --git a/src/libnetdata/threads/threads.c b/src/libnetdata/threads/threads.c index ca75ec77de..db060a8bf0 100644 --- a/src/libnetdata/threads/threads.c +++ b/src/libnetdata/threads/threads.c @@ -421,6 +421,7 @@ void nd_thread_signal_cancel(ND_THREAD *nti) { spinlock_unlock(&nti->canceller.spinlock); } +ALWAYS_INLINE bool nd_thread_signaled_to_cancel(void) { if(!_nd_thread_info) return false; return __atomic_load_n(&_nd_thread_info->cancel_atomic, __ATOMIC_RELAXED); diff --git a/src/libnetdata/uuid/uuidmap.c b/src/libnetdata/uuid/uuidmap.c index ce24e81c22..f9231537a4 100644 --- a/src/libnetdata/uuid/uuidmap.c +++ b/src/libnetdata/uuid/uuidmap.c @@ -278,6 +278,59 @@ UUIDMAP_ID uuidmap_dup(UUIDMAP_ID id) { return id; } +size_t uuidmap_destroy(void) { + size_t referenced = 0; + + // Traverse all partitions + for (size_t partition = 0; partition < UUIDMAP_PARTITIONS; partition++) { + // Lock the partition to prevent new entries while we're cleaning up + rw_spinlock_write_lock(&uuid_map.p[partition].spinlock); + + Pvoid_t uuid_to_id = uuid_map.p[partition].uuid_to_id; + Pvoid_t id_to_uuid = uuid_map.p[partition].id_to_uuid; + Pvoid_t freed_ids = uuid_map.p[partition].freed_ids; + + // Process all entries in the id_to_uuid map + Word_t id_index = 0; + Pvoid_t *id_pvalue; + + for (id_pvalue = JudyLFirst(id_to_uuid, &id_index, PJE0); + id_pvalue != NULL && id_pvalue != PJERR; + id_pvalue = JudyLNext(id_to_uuid, &id_index, PJE0)) { + + if (!(*id_pvalue)) + continue; + + struct uuidmap_entry *ue = *id_pvalue; + + // Try to acquire for deletion + if (!refcount_acquire_for_deletion(&ue->refcount)) + referenced++; + + aral_freez(uuid_map.ar, ue); + } + + // Free all Judy arrays + JudyHSFreeArray(&uuid_to_id, PJE0); + JudyLFreeArray(&id_to_uuid, PJE0); + JudyLFreeArray(&freed_ids, PJE0); + + // Reset partition data + memset(&uuid_map.p[partition], 0, sizeof(uuid_map.p[partition])); + + rw_spinlock_write_unlock(&uuid_map.p[partition].spinlock); + } + + // Destroy ARAL + if (uuid_map.ar) { + aral_destroy(uuid_map.ar); + uuid_map.ar = NULL; + } + + memset(&uuid_map, 0, sizeof(uuid_map)); + return referenced; +} + // -------------------------------------------------------------------------------------------------------------------- static volatile bool stop_flag = false; diff --git a/src/libnetdata/uuid/uuidmap.h b/src/libnetdata/uuid/uuidmap.h index 8ba144d7fc..1c8a46fbb7 100644 --- a/src/libnetdata/uuid/uuidmap.h +++ b/src/libnetdata/uuid/uuidmap.h @@ -24,6 +24,9 @@ static inline UUIDMAP_ID uuidmap_make_id(uint8_t partition, uint32_t id) { // returns ID, or zero on error UUIDMAP_ID uuidmap_create(const nd_uuid_t uuid); +// returns the number of entries still referenced (although freed) +size_t uuidmap_destroy(void); + // delete a uuid from the map void uuidmap_free(UUIDMAP_ID id); diff --git a/src/streaming/stream-connector.c b/src/streaming/stream-connector.c index e2f1fe6c5c..3b578b5d21 100644 --- a/src/streaming/stream-connector.c +++ b/src/streaming/stream-connector.c @@ -531,7 +531,7 @@ static void *stream_connector_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_CONNECTOR_JOB_CANCELLED_NODES, "cancelled nodes", "nodes", WORKER_METRIC_ABSOLUTE); unsigned job_id = 0; - while(!nd_thread_signaled_to_cancel() && service_running(SERVICE_STREAMING)) { + while(service_running(SERVICE_STREAMING)) { worker_is_idle(); job_id = completion_wait_for_a_job_with_timeout(&sc->completion, job_id, 1000); @@ -619,6 +619,27 @@ static void *stream_connector_thread(void *ptr) { worker_set_metric(WORKER_SENDER_CONNECTOR_JOB_CANCELLED_NODES, (NETDATA_DOUBLE)cancelled_nodes); } +#if defined(FSANITIZE_ADDRESS) + // sometimes this thread exits, with localhost still in the queue + sleep(3); +#endif + + spinlock_lock(&sc->queue.spinlock); + Word_t idx = 0; + for(struct sender_state *s = SENDERS_FIRST(&sc->queue.senders, &idx); + s; + s = SENDERS_NEXT(&sc->queue.senders, &idx)) { + SENDERS_DEL(&sc->queue.senders, idx); + spinlock_unlock(&sc->queue.spinlock); + + // do not have the connector lock when calling these + stream_sender_on_disconnect(s); + stream_sender_remove(s, s->exit.reason); + + spinlock_lock(&sc->queue.spinlock); + } + spinlock_unlock(&sc->queue.spinlock); + return NULL; } diff --git a/src/streaming/stream-replication-sender.c b/src/streaming/stream-replication-sender.c index 54ac66f2d1..2149e20bcc 100644 --- a/src/streaming/stream-replication-sender.c +++ b/src/streaming/stream-replication-sender.c @@ -1718,7 +1718,6 @@ void *replication_thread_main(void *ptr) { for(size_t i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%zu]", i + 2); - replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(ND_THREAD *)); __atomic_add_fetch(&replication_buffers_allocated, sizeof(ND_THREAD *), __ATOMIC_RELAXED); replication_globals.main_thread.threads_ptrs[i] = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL); diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c index 38a3d8bd2a..50e843c177 100644 --- a/src/streaming/stream-sender.c +++ b/src/streaming/stream-sender.c @@ -367,9 +367,7 @@ void stream_sender_remove(struct sender_state *s, STREAM_HANDSHAKE reason) { s->exit.reason = 0; __atomic_store_n(&s->exit.shutdown, false, __ATOMIC_RELAXED); - rrdhost_flag_clear(s->host, - RRDHOST_FLAG_STREAM_SENDER_ADDED | RRDHOST_FLAG_STREAM_SENDER_CONNECTED | - RRDHOST_FLAG_STREAM_SENDER_READY_4_METRICS); + rrdhost_flag_clear(s->host, RRDHOST_FLAG_STREAM_SENDER_ADDED | RRDHOST_FLAG_STREAM_SENDER_CONNECTED | RRDHOST_FLAG_STREAM_SENDER_READY_4_METRICS); s->last_state_since_t = now_realtime_sec(); stream_parent_set_host_disconnect_reason(s->host, reason, s->last_state_since_t);