0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-14 01:29:11 +00:00

Use worker when dispatching alert transitions to the cloud ()

* Queue alert submission to worker

* Correctly register aclk job execution

* Correctly register aclk job execution (rename)

* Remove job
This commit is contained in:
Stelios Fragkakis 2025-01-15 01:13:19 +02:00 committed by GitHub
parent d24de5876e
commit de14425b80
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 61 additions and 12 deletions

View file

@ -56,6 +56,11 @@ void register_libuv_worker_jobs() {
worker_register_job_name(UV_EVENT_METADATA_CLEANUP, "metadata cleanup");
worker_register_job_name(UV_EVENT_METADATA_ML_LOAD, "metadata load ml models");
// aclk_sync
worker_register_job_name(UV_EVENT_ACLK_NODE_INFO, "aclk host node info");
worker_register_job_name(UV_EVENT_ACLK_ALERT_PUSH, "aclk alert push");
worker_register_job_name(UV_EVENT_ACLK_QUERY_EXECUTE, "aclk query execute");
// netdatacli
worker_register_job_name(UV_EVENT_SCHEDULE_CMD, "schedule command");

View file

@ -48,6 +48,11 @@ enum event_loop_job {
UV_EVENT_METADATA_CLEANUP,
UV_EVENT_METADATA_ML_LOAD,
// aclk_sync
UV_EVENT_ACLK_NODE_INFO,
UV_EVENT_ACLK_ALERT_PUSH,
UV_EVENT_ACLK_QUERY_EXECUTE,
// netdatacli
UV_EVENT_SCHEDULE_CMD,
};

View file

@ -20,6 +20,7 @@ struct aclk_sync_config_s {
bool initialized;
mqtt_wss_client client;
int aclk_queries_running;
bool alert_push_running;
SPINLOCK cmd_queue_lock;
struct aclk_database_cmd *cmd_base;
} aclk_sync_config = { 0 };
@ -285,7 +286,6 @@ static void timer_cb(uv_timer_t *handle)
if (aclk_online_for_alerts()) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
aclk_database_enq_cmd(&cmd);
aclk_check_node_info_and_collectors();
}
}
@ -297,7 +297,6 @@ struct aclk_query_payload {
static void after_aclk_run_query_job(uv_work_t *req, int status __maybe_unused)
{
worker_is_busy(ACLK_QUERY_EXECUTE);
struct aclk_query_payload *payload = req->data;
struct aclk_sync_config_s *config = payload->config;
config->aclk_queries_running--;
@ -321,11 +320,16 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query
static void aclk_run_query_job(uv_work_t *req)
{
register_libuv_worker_jobs();
worker_is_busy(UV_EVENT_ACLK_QUERY_EXECUTE);
struct aclk_query_payload *payload = req->data;
struct aclk_sync_config_s *config = payload->config;
aclk_query_t query = (aclk_query_t) payload->data;
aclk_run_query(config, query);
worker_is_idle();
}
static void node_update_timer_cb(uv_timer_t *handle)
@ -346,6 +350,34 @@ static void close_callback(uv_handle_t *handle, void *data __maybe_unused)
uv_close(handle, NULL); // Automatically close and free the handle
}
struct alert_push_data {
uv_work_t request;
struct aclk_sync_config_s *config;
};
static void after_start_alert_push(uv_work_t *req, int status __maybe_unused)
{
struct alert_push_data *data = req->data;
struct aclk_sync_config_s *config = data->config;
config->alert_push_running = false;
freez(data);
}
// Worker thread to scan hosts for pending metadata to store
static void start_alert_push(uv_work_t *req __maybe_unused)
{
register_libuv_worker_jobs();
worker_is_busy(UV_EVENT_ACLK_NODE_INFO);
aclk_check_node_info_and_collectors();
worker_is_idle();
worker_is_busy(UV_EVENT_ACLK_ALERT_PUSH);
aclk_push_alert_events_for_all_hosts();
worker_is_idle();
}
static void aclk_synchronization(void *arg)
{
struct aclk_sync_config_s *config = arg;
@ -357,9 +389,7 @@ static void aclk_synchronization(void *arg)
worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
worker_register_job_name(ACLK_QUERY_EXECUTE, "query execute");
worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "query execute sync");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "aclk query execute sync");
uv_loop_t *loop = &config->loop;
fatal_assert(0 == uv_loop_init(loop));
@ -378,6 +408,8 @@ static void aclk_synchronization(void *arg)
int query_thread_count = netdata_conf_cloud_query_threads();
netdata_log_info("Starting ACLK synchronization thread with %d parallel query threads", query_thread_count);
struct alert_push_data *data;
while (likely(service_running(SERVICE_ACLK))) {
enum aclk_database_opcode opcode;
worker_is_idle();
@ -441,9 +473,21 @@ static void aclk_synchronization(void *arg)
aclk_push_alert_config_event(cmd.param[0], cmd.param[1]);
break;
case ACLK_DATABASE_PUSH_ALERT:
aclk_push_alert_events_for_all_hosts();
break;
if (config->alert_push_running)
break;
config->alert_push_running = true;
data = mallocz(sizeof(*data));
data->request.data = data;
data->config = config;
if (uv_queue_work(loop, &data->request, start_alert_push, after_start_alert_push)) {
freez(data);
config->alert_push_running = false;
}
break;
case ACLK_MQTT_WSS_CLIENT:
config->client = (mqtt_wss_client) cmd.param[0];
break;

View file

@ -24,7 +24,6 @@ enum aclk_database_opcode {
ACLK_MQTT_WSS_CLIENT,
ACLK_QUERY_EXECUTE,
ACLK_QUERY_EXECUTE_SYNC,
ACLK_DATABASE_TIMER,
// leave this last
// we need it to check for worker utilization

View file

@ -594,10 +594,6 @@ void aclk_push_alert_events_for_all_hosts(void)
{
RRDHOST *host;
// Checking if we shutting down
if (!service_running(SERVICE_ACLK))
return;
dfe_start_reentrant(rrdhost_root_index, host) {
if (!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS) ||
rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))