0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 11:12:42 +00:00

Schedule retention message calculation to a worker thread ()

* Move aclk_update_retention to the proper header file

* Do a scan but avoid going through all the dimensions if we have too much to delete -- do not generate a retention message in that case

* Schedule the retention calculation to a worker

* Adjust messages in the access log

* Fix compilation errors with --disable-cloud
This commit is contained in:
Stelios Fragkakis 2022-06-01 19:10:32 +03:00 committed by GitHub
parent 044d4c9d91
commit c261a771cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 14 deletions

View file

@ -34,6 +34,28 @@ const char *aclk_sync_config[] = {
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
int retention_running = 0;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void stop_retention_run()
{
uv_mutex_lock(&aclk_async_lock);
retention_running = 0;
uv_mutex_unlock(&aclk_async_lock);
}
static int request_retention_run()
{
int rc = 0;
uv_mutex_lock(&aclk_async_lock);
if (unlikely(retention_running))
rc = 1;
else
retention_running = 1;
uv_mutex_unlock(&aclk_async_lock);
return rc;
}
#endif
int claimed()
{
@ -318,9 +340,6 @@ static void timer_cb(uv_timer_t* handle)
if (aclk_use_new_cloud_arch && aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
cmd.opcode = ACLK_DATABASE_NODE_INFO;
aclk_database_enq_cmd_noblock(wc, &cmd);
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
@ -353,6 +372,38 @@ static void timer_cb(uv_timer_t* handle)
#endif
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void after_send_retention(uv_work_t *req, int status)
{
struct aclk_database_worker_config *wc = req->data;
(void)status;
stop_retention_run();
wc->retention_running = 0;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_DIM_DELETION;
if (aclk_database_enq_cmd_noblock(wc, &cmd))
info("Failed to queue a dimension deletion message");
cmd.opcode = ACLK_DATABASE_NODE_INFO;
if (aclk_database_enq_cmd_noblock(wc, &cmd))
info("Failed to queue a node update info message");
}
static void send_retention(uv_work_t *req)
{
struct aclk_database_worker_config *wc = req->data;
if (unlikely(wc->is_shutting_down))
return;
aclk_update_retention(wc);
}
#endif
#define MAX_CMD_BATCH_SIZE (256)
void aclk_database_worker(void *arg)
@ -429,6 +480,7 @@ void aclk_database_worker(void *arg)
memset(&cmd, 0, sizeof(cmd));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
@ -440,6 +492,7 @@ void aclk_database_worker(void *arg)
wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
while (likely(!netdata_exit)) {
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@ -538,9 +591,21 @@ void aclk_database_worker(void *arg)
aclk_process_dimension_deletion(wc, cmd);
break;
case ACLK_DATABASE_UPD_RETENTION:
if (unlikely(wc->retention_running))
break;
if (unlikely(request_retention_run())) {
wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY;
break;
}
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
aclk_update_retention(wc, cmd);
aclk_process_dimension_deletion(wc, cmd);
retention_work.data = wc;
wc->retention_running = 1;
if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) {
wc->retention_running = 0;
stop_retention_run();
}
break;
// NODE_INSTANCE DETECTION

View file

@ -17,6 +17,7 @@
#define ACLK_MAX_ALERT_UPDATES (5)
#define ACLK_DATABASE_CLEANUP_FIRST (60)
#define ACLK_DATABASE_ROTATION_DELAY (180)
#define ACLK_DATABASE_RETENTION_RETRY (60)
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
#define ACLK_DATABASE_ROTATION_INTERVAL (3600)
#define ACLK_DELETE_ACK_INTERNAL (600)
@ -197,6 +198,7 @@ struct aclk_database_worker_config {
int node_info_send;
int chart_pending;
int chart_reset_count;
int retention_running;
volatile unsigned is_shutting_down;
volatile unsigned is_orphan;
struct aclk_database_worker_config *next;

View file

@ -773,7 +773,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
db_execute(buffer_tostring(sql));
log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_free(sql);

View file

@ -566,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
error_report("Failed to ACK sequence id, rc = %d", rc);
else
log_access(
"ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64,
"ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64,
wc->node_id,
wc->host ? wc->host->hostname : "N/A",
cmd.param1);
@ -847,9 +847,8 @@ failed:
"SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
"WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
void aclk_update_retention(struct aclk_database_worker_config *wc)
{
UNUSED(cmd);
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
@ -916,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.node_id = strdupz(wc->node_id);
time_t now = now_realtime_sec();
while (sqlite3_step(res) == SQLITE_ROW) {
while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) {
if (unlikely(netdata_exit))
break;
if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) {
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@ -1003,8 +1004,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
if (!wc->host)
hostname = get_hostname_by_node_id(wc->node_id);
log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
else
log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
freez(hostname);
#ifdef NETDATA_INTERNAL_CHECKS
@ -1017,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.interval_durations[i].update_every,
rotate_data.interval_durations[i].retention);
#endif
aclk_retention_updated(&rotate_data);
if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
freez(rotate_data.interval_durations);

View file

@ -67,4 +67,5 @@ uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
void aclk_send_dimension_update(RRDDIM *rd);
struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host);
void sql_check_chart_liveness(RRDSET *st);
void aclk_update_retention(struct aclk_database_worker_config *wc);
#endif //NETDATA_SQLITE_ACLK_CHART_H

View file

@ -4,5 +4,4 @@
#define NETDATA_SQLITE_ACLK_NODE_H
void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
#endif //NETDATA_SQLITE_ACLK_NODE_H