diff --git a/src/daemon/libuv_workers.c b/src/daemon/libuv_workers.c index ac31ad5d56..ef308129e2 100644 --- a/src/daemon/libuv_workers.c +++ b/src/daemon/libuv_workers.c @@ -56,6 +56,11 @@ void register_libuv_worker_jobs() { worker_register_job_name(UV_EVENT_METADATA_CLEANUP, "metadata cleanup"); worker_register_job_name(UV_EVENT_METADATA_ML_LOAD, "metadata load ml models"); + // aclk_sync + worker_register_job_name(UV_EVENT_ACLK_NODE_INFO, "aclk host node info"); + worker_register_job_name(UV_EVENT_ACLK_ALERT_PUSH, "aclk alert push"); + worker_register_job_name(UV_EVENT_ACLK_QUERY_EXECUTE, "aclk query execute"); + // netdatacli worker_register_job_name(UV_EVENT_SCHEDULE_CMD, "schedule command"); diff --git a/src/daemon/libuv_workers.h b/src/daemon/libuv_workers.h index faa6b1771c..079d3ad4aa 100644 --- a/src/daemon/libuv_workers.h +++ b/src/daemon/libuv_workers.h @@ -48,6 +48,11 @@ enum event_loop_job { UV_EVENT_METADATA_CLEANUP, UV_EVENT_METADATA_ML_LOAD, + // aclk_sync + UV_EVENT_ACLK_NODE_INFO, + UV_EVENT_ACLK_ALERT_PUSH, + UV_EVENT_ACLK_QUERY_EXECUTE, + // netdatacli UV_EVENT_SCHEDULE_CMD, }; diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 4e14935905..6a76c8a5fa 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -20,6 +20,7 @@ struct aclk_sync_config_s { bool initialized; mqtt_wss_client client; int aclk_queries_running; + bool alert_push_running; SPINLOCK cmd_queue_lock; struct aclk_database_cmd *cmd_base; } aclk_sync_config = { 0 }; @@ -285,7 +286,6 @@ static void timer_cb(uv_timer_t *handle) if (aclk_online_for_alerts()) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; aclk_database_enq_cmd(&cmd); - aclk_check_node_info_and_collectors(); } } @@ -297,7 +297,6 @@ struct aclk_query_payload { static void after_aclk_run_query_job(uv_work_t *req, int status __maybe_unused) { - worker_is_busy(ACLK_QUERY_EXECUTE); struct aclk_query_payload *payload = req->data; struct aclk_sync_config_s *config = payload->config; config->aclk_queries_running--; @@ -321,11 +320,16 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query static void aclk_run_query_job(uv_work_t *req) { + register_libuv_worker_jobs(); + + worker_is_busy(UV_EVENT_ACLK_QUERY_EXECUTE); + struct aclk_query_payload *payload = req->data; struct aclk_sync_config_s *config = payload->config; aclk_query_t query = (aclk_query_t) payload->data; aclk_run_query(config, query); + worker_is_idle(); } static void node_update_timer_cb(uv_timer_t *handle) @@ -346,6 +350,34 @@ static void close_callback(uv_handle_t *handle, void *data __maybe_unused) uv_close(handle, NULL); // Automatically close and free the handle } +struct alert_push_data { + uv_work_t request; + struct aclk_sync_config_s *config; +}; + +static void after_start_alert_push(uv_work_t *req, int status __maybe_unused) +{ + struct alert_push_data *data = req->data; + struct aclk_sync_config_s *config = data->config; + + config->alert_push_running = false; + freez(data); +} + +// Worker thread to scan hosts for pending metadata to store +static void start_alert_push(uv_work_t *req __maybe_unused) +{ + register_libuv_worker_jobs(); + + worker_is_busy(UV_EVENT_ACLK_NODE_INFO); + aclk_check_node_info_and_collectors(); + worker_is_idle(); + + worker_is_busy(UV_EVENT_ACLK_ALERT_PUSH); + aclk_push_alert_events_for_all_hosts(); + worker_is_idle(); +} + static void aclk_synchronization(void *arg) { struct aclk_sync_config_s *config = arg; @@ -357,9 +389,7 @@ static void aclk_synchronization(void *arg) worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); - worker_register_job_name(ACLK_QUERY_EXECUTE, "query execute"); - worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "query execute sync"); - worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); + worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "aclk query execute sync"); uv_loop_t *loop = &config->loop; fatal_assert(0 == uv_loop_init(loop)); @@ -378,6 +408,8 @@ static void aclk_synchronization(void *arg) int query_thread_count = netdata_conf_cloud_query_threads(); netdata_log_info("Starting ACLK synchronization thread with %d parallel query threads", query_thread_count); + struct alert_push_data *data; + while (likely(service_running(SERVICE_ACLK))) { enum aclk_database_opcode opcode; worker_is_idle(); @@ -441,9 +473,21 @@ static void aclk_synchronization(void *arg) aclk_push_alert_config_event(cmd.param[0], cmd.param[1]); break; case ACLK_DATABASE_PUSH_ALERT: - aclk_push_alert_events_for_all_hosts(); - break; + if (config->alert_push_running) + break; + + config->alert_push_running = true; + + data = mallocz(sizeof(*data)); + data->request.data = data; + data->config = config; + + if (uv_queue_work(loop, &data->request, start_alert_push, after_start_alert_push)) { + freez(data); + config->alert_push_running = false; + } + break; case ACLK_MQTT_WSS_CLIENT: config->client = (mqtt_wss_client) cmd.param[0]; break; diff --git a/src/database/sqlite/sqlite_aclk.h b/src/database/sqlite/sqlite_aclk.h index 90e980ad3f..ff2b256874 100644 --- a/src/database/sqlite/sqlite_aclk.h +++ b/src/database/sqlite/sqlite_aclk.h @@ -24,7 +24,6 @@ enum aclk_database_opcode { ACLK_MQTT_WSS_CLIENT, ACLK_QUERY_EXECUTE, ACLK_QUERY_EXECUTE_SYNC, - ACLK_DATABASE_TIMER, // leave this last // we need it to check for worker utilization diff --git a/src/database/sqlite/sqlite_aclk_alert.c b/src/database/sqlite/sqlite_aclk_alert.c index 7ddc53f8ae..39055ced15 100644 --- a/src/database/sqlite/sqlite_aclk_alert.c +++ b/src/database/sqlite/sqlite_aclk_alert.c @@ -594,10 +594,6 @@ void aclk_push_alert_events_for_all_hosts(void) { RRDHOST *host; - // Checking if we shutting down - if (!service_running(SERVICE_ACLK)) - return; - dfe_start_reentrant(rrdhost_root_index, host) { if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) || rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))