0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-30 07:30:04 +00:00

Metadata event loop code cleanup ()

* Remove query counter

* Refactor metadata worker structure to use stack-allocated event loop and improve initialization checks
Code cleanup

* Refactor metadata_queue_load_host_context to remove unnecessary parameter
This commit is contained in:
Stelios Fragkakis 2025-03-21 20:39:58 +02:00 committed by GitHub
parent b75958a4ab
commit de88c30dca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 86 additions and 138 deletions

View file

@ -459,8 +459,7 @@ RRDHOST *rrdhost_create(
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
if (is_localhost) { if (is_localhost) {
BUFFER *buf = buffer_create(0, NULL); BUFFER *buf = buffer_create(0, NULL);
size_t query_counter = 0; store_host_info_and_metadata(host, buf);
store_host_info_and_metadata(host, buf, &query_counter);
buffer_free(buf); buffer_free(buf);
} }
rrdhost_load_rrdcontext_data(host); rrdhost_load_rrdcontext_data(host);

View file

@ -958,7 +958,7 @@ void sql_aclk_sync_init(void)
netdata_log_info("Created %d archived hosts", number_of_children); netdata_log_info("Created %d archived hosts", number_of_children);
// Trigger host context load for hosts that have been created // Trigger host context load for hosts that have been created
metadata_queue_load_host_context(NULL); metadata_queue_load_host_context();
if (!number_of_children) if (!number_of_children)
aclk_queue_node_info(localhost, true); aclk_queue_node_info(localhost, true);

View file

@ -229,26 +229,25 @@ typedef enum {
struct metadata_wc { struct metadata_wc {
uv_thread_t thread; uv_thread_t thread;
uv_loop_t *loop; uv_loop_t loop;
uv_async_t async; uv_async_t async;
uv_timer_t timer_req; uv_timer_t timer_req;
time_t metadata_check_after; time_t metadata_check_after;
Pvoid_t ae_DelJudyL; Pvoid_t ae_DelJudyL;
METADATA_FLAG flags; METADATA_FLAG flags;
bool initialized;
SPINLOCK cmd_queue_lock;
struct completion start_stop_complete; struct completion start_stop_complete;
struct completion *scan_complete; struct completion *scan_complete;
/* FIFO command queue */ /* FIFO command queue */
SPINLOCK cmd_queue_lock;
struct metadata_cmd *cmd_base; struct metadata_cmd *cmd_base;
ARAL *ar; ARAL *ar;
}; } metasync_worker;
#define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag)) #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
#define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST) #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
#define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST) #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
struct metadata_wc metasync_worker = {.loop = NULL};
// //
// For unittest // For unittest
// //
@ -883,7 +882,7 @@ static int chart_label_store_to_sql_callback(const char *name, const char *value
return 1; return 1;
} }
static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter) static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer)
{ {
size_t old_version = st->rrdlabels_last_saved_version; size_t old_version = st->rrdlabels_last_saved_version;
size_t new_version = rrdlabels_version(st->rrdlabels); size_t new_version = rrdlabels_version(st->rrdlabels);
@ -896,10 +895,8 @@ static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t
rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp); rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
buffer_strcat(work_buffer, " ON CONFLICT (chart_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()"); buffer_strcat(work_buffer, " ON CONFLICT (chart_id, label_key) DO UPDATE SET source_type = excluded.source_type, label_value=excluded.label_value, date_created=UNIXEPOCH()");
int rc = db_execute(db_meta, buffer_tostring(work_buffer)); int rc = db_execute(db_meta, buffer_tostring(work_buffer));
if (likely(!rc)) { if (likely(!rc))
st->rrdlabels_last_saved_version = new_version; st->rrdlabels_last_saved_version = new_version;
(*query_counter)++;
}
return rc; return rc;
} }
@ -1582,6 +1579,9 @@ static void metadata_free_cmd_queue(struct metadata_wc *wc)
static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
{ {
if(unlikely(!wc->initialized))
return;
if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
goto wakeup_event_loop; goto wakeup_event_loop;
@ -1639,10 +1639,10 @@ static void timer_cb(uv_timer_t* handle)
uv_update_time(handle->loop); uv_update_time(handle->loop);
struct metadata_wc *wc = handle->data; struct metadata_wc *wc = handle->data;
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
if (wc->metadata_check_after < now_realtime_sec()) { if (wc->metadata_check_after < now_realtime_sec()) {
struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = METADATA_SCAN_HOSTS; cmd.opcode = METADATA_SCAN_HOSTS;
metadata_enq_cmd(wc, &cmd); metadata_enq_cmd(wc, &cmd);
} }
@ -2058,7 +2058,7 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
freez(data); freez(data);
} }
static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter, bool shutting_down) static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, bool shutting_down)
{ {
static bool skip_models = false; static bool skip_models = false;
RRDSET *st; RRDSET *st;
@ -2074,18 +2074,15 @@ static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, size_t *query
rrdset_foreach_reentrant(st, host) { rrdset_foreach_reentrant(st, host) {
if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) { if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
(*query_counter)++;
rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
buffer_flush(work_buffer); buffer_flush(work_buffer);
worker_is_busy(UV_EVENT_STORE_CHART); worker_is_busy(UV_EVENT_STORE_CHART);
rc = check_and_update_chart_labels(st, work_buffer, query_counter); rc = check_and_update_chart_labels(st, work_buffer);
if (unlikely(rc)) if (unlikely(rc))
error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st)); error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st));
else
(*query_counter)++;
rc = store_chart_metadata(st, &store_chart); rc = store_chart_metadata(st, &store_chart);
if (unlikely(rc)) { if (unlikely(rc)) {
@ -2096,8 +2093,6 @@ static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, size_t *query
rrdhost_hostname(host), rrdhost_hostname(host),
rrdset_name(st)); rrdset_name(st));
} }
else
(*query_counter)++;
worker_is_idle(); worker_is_idle();
} }
@ -2133,8 +2128,7 @@ static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, size_t *query
rrdhost_hostname(host), rrdhost_hostname(host),
rrdset_name(st), rrdset_name(st),
rrddim_name(rd)); rrddim_name(rd));
} else }
(*query_counter)++;
worker_is_idle(); worker_is_idle();
} }
@ -2154,7 +2148,7 @@ static void metadata_scan_host(RRDHOST *host, BUFFER *work_buffer, size_t *query
} }
static void store_host_and_system_info(RRDHOST *host, size_t *query_counter) static void store_host_and_system_info(RRDHOST *host)
{ {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
@ -2162,19 +2156,11 @@ static void store_host_and_system_info(RRDHOST *host, size_t *query_counter)
error_report("METADATA: 'host:%s': Failed to store host updated system information in the database", rrdhost_hostname(host)); error_report("METADATA: 'host:%s': Failed to store host updated system information in the database", rrdhost_hostname(host));
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
} }
else {
if (likely(query_counter))
(*query_counter)++;
}
if (unlikely(store_host_metadata(host))) { if (unlikely(store_host_metadata(host))) {
error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host)); error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host));
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
} }
else {
if (likely(query_counter))
(*query_counter)++;
}
} }
struct judy_list_t { struct judy_list_t {
@ -2333,7 +2319,7 @@ static void store_sql_statements(struct judy_list_t *pending_sql_statement)
worker_is_idle(); worker_is_idle();
} }
static void meta_store_host_labels(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter) static void meta_store_host_labels(RRDHOST *host, BUFFER *work_buffer)
{ {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS); rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
@ -2344,7 +2330,6 @@ static void meta_store_host_labels(RRDHOST *host, BUFFER *work_buffer, size_t *q
return; return;
} }
(*query_counter)++;
buffer_flush(work_buffer); buffer_flush(work_buffer);
struct query_build tmp = {.sql = work_buffer, .count = 0}; struct query_build tmp = {.sql = work_buffer, .count = 0};
@ -2358,11 +2343,10 @@ static void meta_store_host_labels(RRDHOST *host, BUFFER *work_buffer, size_t *q
if (unlikely(rc)) { if (unlikely(rc)) {
error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host)); error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host));
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
} else }
(*query_counter)++;
} }
static void store_host_claim_id(RRDHOST *host, size_t *query_counter) static void store_host_claim_id(RRDHOST *host)
{ {
rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID); rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
int rc; int rc;
@ -2374,23 +2358,23 @@ static void store_host_claim_id(RRDHOST *host, size_t *query_counter)
if (unlikely(rc)) if (unlikely(rc))
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE);
else
(*query_counter)++;
} }
void store_host_info_and_metadata(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter)
void store_host_info_and_metadata(RRDHOST *host, BUFFER *work_buffer)
{ {
// Store labels (if needed) // Store labels (if needed)
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS)))
meta_store_host_labels(host, work_buffer, query_counter); meta_store_host_labels(host, work_buffer);
// Store claim id (if needed) // Store claim id (if needed)
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID)))
store_host_claim_id(host, query_counter); store_host_claim_id(host);
// Store host and system info (if needed); // Store host and system info (if needed);
if (rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO)) if (rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))
store_host_and_system_info(host, query_counter); store_host_and_system_info(host);
} }
// Worker thread to scan hosts for pending metadata to store // Worker thread to scan hosts for pending metadata to store
@ -2419,26 +2403,17 @@ static void start_metadata_hosts(uv_work_t *req)
continue; continue;
usec_t started_ut = now_monotonic_usec(); usec_t started_ut = now_monotonic_usec();
size_t query_counter = 0;
rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE); rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
worker_is_busy(UV_EVENT_STORE_HOST); worker_is_busy(UV_EVENT_STORE_HOST);
// store labels, claim_id, host and system info (if needed) // store labels, claim_id, host and system info (if needed)
store_host_info_and_metadata(host, work_buffer, &query_counter); store_host_info_and_metadata(host, work_buffer);
worker_is_idle(); worker_is_idle();
metadata_scan_host(host, work_buffer, &query_counter, shutting_down); metadata_scan_host(host, work_buffer, shutting_down);
COMPUTE_DURATION(report_duration, "us", started_ut, now_monotonic_usec()); COMPUTE_DURATION(report_duration, "us", started_ut, now_monotonic_usec());
nd_log_daemon( nd_log_daemon(NDLP_DEBUG, "Host %s saved metadata in %s", rrdhost_hostname(host), report_duration);
NDLP_DEBUG,
"Host %s saved metadata with %zu SQL statements, in %s",
rrdhost_hostname(host),
query_counter,
report_duration);
} }
dfe_done(host); dfe_done(host);
@ -2463,9 +2438,16 @@ static void close_callback(uv_handle_t *handle, void *data __maybe_unused)
uv_close(handle, NULL); // Automatically close and free the handle uv_close(handle, NULL); // Automatically close and free the handle
} }
#define EVENT_LOOP_NAME "METASYNC"
static void metadata_event_loop(void *arg) static void metadata_event_loop(void *arg)
{ {
worker_register("METASYNC"); struct metadata_wc *config = arg;
uv_thread_set_name_np(EVENT_LOOP_NAME);
worker_register(EVENT_LOOP_NAME);
config->ar = aral_by_size_acquire(sizeof(struct metadata_cmd));
worker_register_job_name(METADATA_DATABASE_NOOP, "noop"); worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension"); worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
@ -2476,48 +2458,25 @@ static void metadata_event_loop(void *arg)
worker_register_job_name(METADATA_DEL_HOST_AE, "delete host alert entry"); worker_register_job_name(METADATA_DEL_HOST_AE, "delete host alert entry");
worker_register_job_name(METADATA_EXECUTE_STORE_STATEMENT, "add sql statement"); worker_register_job_name(METADATA_EXECUTE_STORE_STATEMENT, "add sql statement");
int ret;
unsigned cmd_batch_size; unsigned cmd_batch_size;
struct metadata_wc *wc = arg;
enum metadata_opcode opcode;
wc->ar = aral_by_size_acquire(sizeof(struct metadata_cmd)); uv_loop_t *loop = &config->loop;
fatal_assert(0 == uv_loop_init(loop));
uv_thread_set_name_np("METASYNC"); fatal_assert(0 == uv_async_init(loop, &config->async, async_cb));
uv_loop_t *loop = wc->loop = mallocz(sizeof(uv_loop_t)); fatal_assert(0 == uv_timer_init(loop, &config->timer_req));
ret = uv_loop_init(loop); fatal_assert(0 == uv_timer_start(&config->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
if (ret) { loop->data = config;
netdata_log_error("uv_loop_init(): %s", uv_strerror(ret)); config->async.data = config;
goto error_after_loop_init; config->timer_req.data = config;
}
loop->data = wc;
ret = uv_async_init(wc->loop, &wc->async, async_cb);
if (ret) {
netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
goto error_after_async_init;
}
wc->async.data = wc;
ret = uv_timer_init(loop, &wc->timer_req);
if (ret) {
netdata_log_error("uv_timer_init(): %s", uv_strerror(ret));
goto error_after_timer_init;
}
wc->timer_req.data = wc;
fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread"); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Starting metadata sync thread");
struct metadata_cmd cmd; struct metadata_cmd cmd;
memset(&cmd, 0, sizeof(cmd)); memset(&cmd, 0, sizeof(cmd));
metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); metadata_flag_clear(config, METADATA_FLAG_PROCESSING);
config->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
completion_mark_complete(&config->start_stop_complete);
wc->metadata_check_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
int shutdown = 0;
completion_mark_complete(&wc->start_stop_complete);
BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
struct scan_metadata_payload *data; struct scan_metadata_payload *data;
Pvoid_t *Pvalue; Pvoid_t *Pvalue;
@ -2526,11 +2485,14 @@ static void metadata_event_loop(void *arg)
struct judy_list_t *pending_uuid_deletion = NULL; struct judy_list_t *pending_uuid_deletion = NULL;
struct judy_list_t *pending_sql_statement = NULL; struct judy_list_t *pending_sql_statement = NULL;
while (shutdown == 0 || (wc->flags & METADATA_FLAG_PROCESSING)) { int shutdown = 0;
config->initialized = true;
while (shutdown == 0 || (config->flags & METADATA_FLAG_PROCESSING)) {
nd_uuid_t *uuid; nd_uuid_t *uuid;
RRDHOST *host = NULL; RRDHOST *host = NULL;
ALARM_ENTRY *ae = NULL; ALARM_ENTRY *ae = NULL;
sqlite3_stmt *stmt; sqlite3_stmt *stmt;
enum metadata_opcode opcode;
worker_is_idle(); worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT); uv_run(loop, UV_RUN_DEFAULT);
@ -2541,10 +2503,10 @@ static void metadata_event_loop(void *arg)
if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE)) if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
break; break;
cmd = metadata_deq_cmd(wc); cmd = metadata_deq_cmd(config);
opcode = cmd.opcode; opcode = cmd.opcode;
if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(config, METADATA_FLAG_SHUTDOWN))) {
shutdown = 1; shutdown = 1;
continue; continue;
} }
@ -2593,7 +2555,7 @@ static void metadata_event_loop(void *arg)
} }
break; break;
case METADATA_SCAN_HOSTS: case METADATA_SCAN_HOSTS:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_PROCESSING))) if (unlikely(metadata_flag_check(config, METADATA_FLAG_PROCESSING)))
break; break;
if (unittest_running) if (unittest_running)
@ -2601,7 +2563,7 @@ static void metadata_event_loop(void *arg)
data = mallocz(sizeof(*data)); data = mallocz(sizeof(*data));
data->request.data = data; data->request.data = data;
data->wc = wc; data->wc = config;
data->pending_alert_list = pending_ae_list; data->pending_alert_list = pending_ae_list;
data->pending_ctx_cleanup_list = pending_ctx_cleanup_list; data->pending_ctx_cleanup_list = pending_ctx_cleanup_list;
data->pending_uuid_deletion = pending_uuid_deletion; data->pending_uuid_deletion = pending_uuid_deletion;
@ -2616,16 +2578,16 @@ static void metadata_event_loop(void *arg)
if (unlikely(cmd.completion)) if (unlikely(cmd.completion))
cmd.completion = NULL; // Do not complete after launching worker (worker will do) cmd.completion = NULL; // Do not complete after launching worker (worker will do)
metadata_flag_set(wc, METADATA_FLAG_PROCESSING); metadata_flag_set(config, METADATA_FLAG_PROCESSING);
if (uv_queue_work(loop, &data->request, start_metadata_hosts, after_metadata_hosts)) { if (uv_queue_work(loop, &data->request, start_metadata_hosts, after_metadata_hosts)) {
// Failed to launch worker -- let the event loop handle completion // Failed to launch worker -- let the event loop handle completion
cmd.completion = wc->scan_complete; cmd.completion = config->scan_complete;
pending_ae_list = data->pending_alert_list; pending_ae_list = data->pending_alert_list;
pending_ctx_cleanup_list = data->pending_ctx_cleanup_list; pending_ctx_cleanup_list = data->pending_ctx_cleanup_list;
pending_uuid_deletion = data->pending_uuid_deletion; pending_uuid_deletion = data->pending_uuid_deletion;
pending_sql_statement = data->pending_sql_statement; pending_sql_statement = data->pending_sql_statement;
freez(data); freez(data);
metadata_flag_clear(wc, METADATA_FLAG_PROCESSING); metadata_flag_clear(config, METADATA_FLAG_PROCESSING);
} }
break; break;
case METADATA_LOAD_HOST_CONTEXT: case METADATA_LOAD_HOST_CONTEXT:
@ -2634,7 +2596,7 @@ static void metadata_event_loop(void *arg)
data = callocz(1,sizeof(*data)); data = callocz(1,sizeof(*data));
data->request.data = data; data->request.data = data;
data->wc = wc; data->wc = config;
if (uv_queue_work(loop, &data->request, start_all_host_load_context, after_start_host_load_context)) { if (uv_queue_work(loop, &data->request, start_all_host_load_context, after_start_host_load_context)) {
freez(data); freez(data);
} }
@ -2655,7 +2617,7 @@ static void metadata_event_loop(void *arg)
*Pvalue = (void *)ae; *Pvalue = (void *)ae;
break; break;
case METADATA_DEL_HOST_AE: case METADATA_DEL_HOST_AE:
(void) JudyLIns(&wc->ae_DelJudyL, (Word_t) (void *) cmd.param[0], PJE0); (void) JudyLIns(&config->ae_DelJudyL, (Word_t) (void *) cmd.param[0], PJE0);
break; break;
case METADATA_EXECUTE_STORE_STATEMENT: case METADATA_EXECUTE_STORE_STATEMENT:
stmt = (sqlite3_stmt *) cmd.param[0]; stmt = (sqlite3_stmt *) cmd.param[0];
@ -2679,6 +2641,7 @@ static void metadata_event_loop(void *arg)
completion_mark_complete(cmd.completion); completion_mark_complete(cmd.completion);
} while (opcode != METADATA_DATABASE_NOOP); } while (opcode != METADATA_DATABASE_NOOP);
} }
config->initialized = false;
uv_walk(loop, (uv_walk_cb) close_callback, NULL); uv_walk(loop, (uv_walk_cb) close_callback, NULL);
uv_run(loop, UV_RUN_NOWAIT); uv_run(loop, UV_RUN_NOWAIT);
@ -2692,10 +2655,10 @@ static void metadata_event_loop(void *arg)
worker_unregister(); worker_unregister();
nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread"); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down metadata thread");
completion_mark_complete(&wc->start_stop_complete); completion_mark_complete(&config->start_stop_complete);
if (wc->scan_complete) { if (config->scan_complete) {
completion_destroy(wc->scan_complete); completion_destroy(config->scan_complete);
freez(wc->scan_complete); freez(config->scan_complete);
} }
Word_t Index; Word_t Index;
@ -2720,16 +2683,8 @@ static void metadata_event_loop(void *arg)
freez(pending_ctx_cleanup_list); freez(pending_ctx_cleanup_list);
} }
metadata_free_cmd_queue(wc); metadata_free_cmd_queue(config);
return; aral_by_size_release(config->ar);
error_after_timer_init:
uv_close((uv_handle_t *)&wc->async, NULL);
error_after_async_init:
fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
aral_by_size_release(wc->ar);
worker_unregister(); worker_unregister();
} }
@ -2753,7 +2708,7 @@ void metadata_sync_shutdown(void)
void metadata_sync_shutdown_prepare(void) void metadata_sync_shutdown_prepare(void)
{ {
static bool running = false; static bool running = false;
if (unlikely(!metasync_worker.loop || running)) if (unlikely(!metasync_worker.initialized || running))
return; return;
running = true; running = true;
@ -2804,20 +2759,16 @@ void metadata_sync_shutdown_background_wait(void) {
void metadata_sync_init(void) void metadata_sync_init(void)
{ {
struct metadata_wc *wc = &metasync_worker; memset(&metasync_worker, 0, sizeof(metasync_worker));
completion_init(&metasync_worker.start_stop_complete);
memset(wc, 0, sizeof(*wc)); fatal_assert(0 == uv_thread_create(&metasync_worker.thread, metadata_event_loop, &metasync_worker));
completion_init(&wc->start_stop_complete);
fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
completion_wait_for(&wc->start_stop_complete);
completion_destroy(&wc->start_stop_complete);
completion_wait_for(&metasync_worker.start_stop_complete);
completion_destroy(&metasync_worker.start_stop_complete);
nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete"); nd_log(NDLS_DAEMON, NDLP_DEBUG, "SQLite metadata sync initialization complete");
} }
// Helpers // Helpers
static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1) static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1)
@ -2833,8 +2784,9 @@ static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *p
// Public // Public
void metaqueue_delete_dimension_uuid(nd_uuid_t *uuid) void metaqueue_delete_dimension_uuid(nd_uuid_t *uuid)
{ {
if (unlikely(!metasync_worker.loop)) if (unlikely(!uuid))
return; return;
nd_uuid_t *use_uuid = mallocz(sizeof(*uuid)); nd_uuid_t *use_uuid = mallocz(sizeof(*uuid));
uuid_copy(*use_uuid, *uuid); uuid_copy(*use_uuid, *uuid);
queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL); queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL);
@ -2861,17 +2813,15 @@ void metaqueue_ml_load_models(RRDDIM *rd)
rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD); rrddim_flag_set(rd, RRDDIM_FLAG_ML_MODEL_LOAD);
} }
void metadata_queue_load_host_context(RRDHOST *host) void metadata_queue_load_host_context()
{ {
if (unlikely(!metasync_worker.loop)) queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, NULL, NULL);
return;
queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts"); nd_log(NDLS_DAEMON, NDLP_DEBUG, "Queued command to load host contexts");
} }
void metadata_queue_ctx_host_cleanup(nd_uuid_t *host_uuid, const char *context) void metadata_queue_ctx_host_cleanup(nd_uuid_t *host_uuid, const char *context)
{ {
if (unlikely(!metasync_worker.loop)) if (unlikely(!host_uuid || !context))
return; return;
struct host_ctx_cleanup_s *ctx_cleanup = mallocz(sizeof(*ctx_cleanup)); struct host_ctx_cleanup_s *ctx_cleanup = mallocz(sizeof(*ctx_cleanup));
@ -2884,8 +2834,9 @@ void metadata_queue_ctx_host_cleanup(nd_uuid_t *host_uuid, const char *context)
void metadata_queue_ae_save(RRDHOST *host, ALARM_ENTRY *ae) void metadata_queue_ae_save(RRDHOST *host, ALARM_ENTRY *ae)
{ {
if (unlikely(!metasync_worker.loop)) if (unlikely(!host || !ae))
return; return;
__atomic_add_fetch(&host->health.pending_transitions, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&host->health.pending_transitions, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&ae->pending_save_count, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&ae->pending_save_count, 1, __ATOMIC_RELAXED);
queue_metadata_cmd(METADATA_ADD_HOST_AE, host, ae); queue_metadata_cmd(METADATA_ADD_HOST_AE, host, ae);
@ -2893,7 +2844,7 @@ void metadata_queue_ae_save(RRDHOST *host, ALARM_ENTRY *ae)
void metadata_queue_ae_deletion(ALARM_ENTRY *ae) void metadata_queue_ae_deletion(ALARM_ENTRY *ae)
{ {
if (unlikely(!metasync_worker.loop)) if (unlikely(!ae))
return; return;
queue_metadata_cmd(METADATA_DEL_HOST_AE, ae, NULL); queue_metadata_cmd(METADATA_DEL_HOST_AE, ae, NULL);
@ -2901,16 +2852,14 @@ void metadata_queue_ae_deletion(ALARM_ENTRY *ae)
void metadata_execute_store_statement(sqlite3_stmt *stmt) void metadata_execute_store_statement(sqlite3_stmt *stmt)
{ {
if (unlikely(!metasync_worker.loop)) if (unlikely(!stmt))
return; return;
queue_metadata_cmd(METADATA_EXECUTE_STORE_STATEMENT, stmt, NULL); queue_metadata_cmd(METADATA_EXECUTE_STORE_STATEMENT, stmt, NULL);
} }
void commit_alert_transitions(RRDHOST *host __maybe_unused) void commit_alert_transitions(RRDHOST *host __maybe_unused)
{ {
if (unlikely(!metasync_worker.loop))
return;
queue_metadata_cmd(METADATA_SCAN_HOSTS, NULL, NULL); queue_metadata_cmd(METADATA_SCAN_HOSTS, NULL, NULL);
} }

View file

@ -32,7 +32,7 @@ void metaqueue_delete_dimension_uuid(nd_uuid_t *uuid);
void metaqueue_store_claim_id(nd_uuid_t *host_uuid, nd_uuid_t *claim_uuid); void metaqueue_store_claim_id(nd_uuid_t *host_uuid, nd_uuid_t *claim_uuid);
void metaqueue_ml_load_models(RRDDIM *rd); void metaqueue_ml_load_models(RRDDIM *rd);
void detect_machine_guid_change(nd_uuid_t *host_uuid); void detect_machine_guid_change(nd_uuid_t *host_uuid);
void metadata_queue_load_host_context(RRDHOST *host); void metadata_queue_load_host_context();
void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc); void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int vacuum_pc);
int sql_metadata_cache_stats(int op); int sql_metadata_cache_stats(int op);
@ -60,7 +60,7 @@ void commit_alert_transitions(RRDHOST *host);
void metadata_sync_shutdown_background(void); void metadata_sync_shutdown_background(void);
void metadata_sync_shutdown_background_wait(void); void metadata_sync_shutdown_background_wait(void);
void metadata_queue_ctx_host_cleanup(nd_uuid_t *host_uuid, const char *context); void metadata_queue_ctx_host_cleanup(nd_uuid_t *host_uuid, const char *context);
void store_host_info_and_metadata(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter); void store_host_info_and_metadata(RRDHOST *host, BUFFER *work_buffer);
void metadata_execute_store_statement(sqlite3_stmt *stmt); void metadata_execute_store_statement(sqlite3_stmt *stmt);
// UNIT TEST // UNIT TEST