0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 11:12:42 +00:00

Improve the ACLK sync process for the new cloud architecture ()

* Move retention code to the charts

* Log information about node registration and updates

* Prevent deadlock if aclk_database_enq_cmd locks for a node

* Improve message (indicate that it comes from alerts). This will be improved in a followup PR

* Disable parts that can't be used if the new cloud env is not available

* Set dimension FLAG if message has been queued

* Queue messages using the correct protocol enabled

* Cleanup unused functions
Rename functions that queue charts and dimensions
Improve the generic chart payload add function
Add a counter for pending charts/dimension payloads to avoid polling the db
Delay the retention update message until we are done with the updates
Fix full resync command to handle sequence_id = 0 correctly
Disable functions not needed when the new cloud env functionality is not compiled

* Add chart_payload count and retry count
Output information or error message if we fail to queue chart/dimension PUSH commands
Only try to queue commands if we have chart_payload_count>0
Remove the event loop shutdown opcode handle

* Improve detection of shutdown (check netdata_exit)

* Adjusting info messages
This commit is contained in:
Stelios Fragkakis 2021-11-03 19:18:35 +02:00 committed by GitHub
parent 339c5c436d
commit e9efad18e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 437 additions and 438 deletions

View file

@ -978,6 +978,7 @@ void ng_aclk_host_state_update(RRDHOST *host, int cmd)
create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
create_query->data.node_creation.hostname = strdupz(host->hostname);
create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
aclk_queue_query(create_query);
return;
}
@ -992,6 +993,8 @@ void ng_aclk_host_state_update(RRDHOST *host, int cmd)
uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
query->data.node_update.queryable = 1;
query->data.node_update.session_id = aclk_session_newarch;
info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
host->system_info->hops);
aclk_queue_query(query);
}
@ -1015,6 +1018,9 @@ void aclk_send_node_instances()
uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
query->data.node_update.queryable = 1;
query->data.node_update.session_id = aclk_session_newarch;
info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
list->live,
list->hops);
aclk_queue_query(query);
} else {
aclk_query_t create_query;
@ -1026,6 +1032,8 @@ void aclk_send_node_instances()
create_query->data.node_creation.hostname = list->hostname;
create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
list->hops);
aclk_queue_query(create_query);
}

View file

@ -1391,19 +1391,12 @@ void rrdset_done(RRDSET *st) {
rrdset_rdlock(st);
#ifdef ENABLE_ACLK
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT) {
if (likely(!sql_queue_chart_to_aclk(st)))
if (likely(!queue_chart_to_aclk(st)))
rrdset_flag_set(st, RRDSET_FLAG_ACLK);
}
}
#else
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
rrdset_flag_set(st, RRDSET_FLAG_ACLK);
aclk_update_chart(st->rrdhost, st->id, 1);
}
#endif
#endif
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) {
@ -1813,12 +1806,13 @@ after_second_database_work:
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
if (unlikely(live != rd->state->aclk_live_status)) {
if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (likely(!sql_queue_dimension_to_aclk(rd))) {
if (likely(!queue_dimension_to_aclk(rd))) {
rd->state->aclk_live_status = live;
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
}
}
}

View file

@ -165,9 +165,9 @@ int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd)
break;
wc = wc->next;
}
uv_mutex_unlock(&aclk_async_lock);
if (wc)
aclk_database_enq_cmd(wc, cmd);
uv_mutex_unlock(&aclk_async_lock);
return (wc == NULL);
}
@ -261,12 +261,22 @@ static void timer_cb(uv_timer_t* handle)
wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
}
if (wc->chart_updates && !wc->chart_pending) {
if (wc->chart_updates && !wc->chart_pending && wc->chart_payload_count) {
cmd.opcode = ACLK_DATABASE_PUSH_CHART;
cmd.count = ACLK_MAX_CHART_BATCH;
cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
if (!aclk_database_enq_cmd_noblock(wc, &cmd)) {
if (wc->retry_count)
info("Queued chart/dimension payload command %s, retry count = %u", wc->host_guid, wc->retry_count);
wc->chart_pending = 1;
wc->retry_count = 0;
} else {
wc->retry_count++;
if (wc->retry_count % 100 == 0)
error_report("Failed to queue chart/dimension payload command %s, retry count = %u",
wc->host_guid,
wc->retry_count);
}
}
if (wc->alert_updates) {
@ -284,7 +294,7 @@ void aclk_database_worker(void *arg)
{
struct aclk_database_worker_config *wc = arg;
uv_loop_t *loop;
int shutdown, ret;
int ret;
enum aclk_database_opcode opcode;
uv_timer_t timer_req;
struct aclk_database_cmd cmd;
@ -324,25 +334,27 @@ void aclk_database_worker(void *arg)
timer_req.data = wc;
fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
wc->error = 0;
shutdown = 0;
wc->retry_count = 0;
wc->node_info_send = (wc->host && !localhost);
aclk_add_worker_thread(wc);
info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc));
memset(&cmd, 0, sizeof(cmd));
sql_get_last_chart_sequence(wc, cmd);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
info("%s: No pending charts and dimensions detected during startup", wc->host_guid);
#endif
wc->chart_updates = 0;
wc->alert_updates = 0;
wc->startup_time = now_realtime_sec();
wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
while (likely(shutdown == 0)) {
uv_run(loop, UV_RUN_DEFAULT);
wc->alert_updates = 0;
if (netdata_exit)
shutdown = 1;
debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
while (likely(!netdata_exit)) {
uv_run(loop, UV_RUN_DEFAULT);
/* wait for commands */
cmd_batch_size = 0;
@ -350,6 +362,10 @@ void aclk_database_worker(void *arg)
if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
break;
cmd = aclk_database_deq_cmd(wc);
if (netdata_exit)
break;
opcode = cmd.opcode;
++cmd_batch_size;
switch (opcode) {
@ -370,6 +386,7 @@ void aclk_database_worker(void *arg)
break;
// CHART / DIMENSION OPERATIONS
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
case ACLK_DATABASE_ADD_CHART:
debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid);
aclk_add_chart_event(wc, cmd);
@ -394,7 +411,7 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC, "RESET chart SEQ for %s to %"PRIu64, wc->uuid_str, (uint64_t) cmd.param1);
aclk_receive_chart_reset(wc, cmd);
break;
#endif
// ALERTS
case ACLK_DATABASE_ADD_ALERT:
debug(D_ACLK_SYNC,"Adding alert event for %s", wc->host_guid);
@ -426,10 +443,15 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
sql_build_node_info(wc, cmd);
break;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
case ACLK_DATABASE_DIM_DELETION:
debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str);
break;
case ACLK_DATABASE_UPD_RETENTION:
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
aclk_update_retention(wc, cmd);
break;
#endif
// NODE_INSTANCE DETECTION
case ACLK_DATABASE_TIMER:
@ -437,7 +459,7 @@ void aclk_database_worker(void *arg)
if (claimed()) {
wc->host = rrdhost_find_by_guid(wc->host_guid, 0);
if (wc->host) {
info("HOST %s detected as active and claimed !!!", wc->host->hostname);
info("HOST %s (%s) detected as active", wc->host->hostname, wc->host_guid);
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "AS_%s", wc->host->hostname);
uv_thread_set_name_np(wc->thread, threadname);
wc->host->dbsync_worker = wc;
@ -452,11 +474,6 @@ void aclk_database_worker(void *arg)
wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
}
break;
case ACLK_DATABASE_SHUTDOWN:
shutdown = 1;
fatal_assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
break;
default:
debug(D_ACLK_SYNC, "%s: default.", __func__);
break;
@ -466,8 +483,11 @@ void aclk_database_worker(void *arg)
} while (opcode != ACLK_DATABASE_NOOP);
}
if (!uv_timer_stop(&timer_req))
uv_close((uv_handle_t *)&timer_req, NULL);
/* cleanup operations of the event loop */
info("Shutting down ACLK_DATABASE engine event loop.");
info("Shutting down ACLK sync event loop.");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
@ -477,7 +497,7 @@ void aclk_database_worker(void *arg)
uv_close((uv_handle_t *)&wc->async, NULL);
uv_run(loop, UV_RUN_DEFAULT);
info("Shutting down ACLK_DATABASE engine event loop complete.");
info("Shutting down ACLK sync event loop complete.");
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
@ -503,8 +523,6 @@ error_after_async_init:
fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
wc->error = UV_EAGAIN;
}
// -------------------------------------------------------------

View file

@ -115,10 +115,18 @@ static inline char *get_str_from_uuid(uuid_t *uuid)
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
ACLK_DATABASE_ADD_ALERT,
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
ACLK_DATABASE_ADD_CHART,
ACLK_DATABASE_ADD_DIMENSION,
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_PUSH_CHART,
ACLK_DATABASE_PUSH_CHART_CONFIG,
ACLK_DATABASE_RESET_CHART,
ACLK_DATABASE_CHART_ACK,
ACLK_DATABASE_UPD_RETENTION,
ACLK_DATABASE_DIM_DELETION,
#endif
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST,
ACLK_DATABASE_NODE_INFO,
@ -126,12 +134,7 @@ enum aclk_database_opcode {
ACLK_DATABASE_PUSH_ALERT_CONFIG,
ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
ACLK_DATABASE_PUSH_CHART,
ACLK_DATABASE_PUSH_CHART_CONFIG,
ACLK_DATABASE_RESET_CHART,
ACLK_DATABASE_SHUTDOWN,
ACLK_DATABASE_TIMER,
ACLK_DATABASE_UPD_RETENTION
ACLK_DATABASE_TIMER
};
struct aclk_chart_payload_t {
@ -172,6 +175,7 @@ struct aclk_database_worker_config {
uint64_t alerts_batch_id; // batch id for alerts to use
uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
uint64_t alert_sequence_id; // last alert sequence_id
uint32_t chart_payload_count;
uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
uv_loop_t *loop;
@ -182,7 +186,7 @@ struct aclk_database_worker_config {
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct aclk_database_cmdqueue cmd_queue;
int error;
uint32_t retry_count;
int chart_updates;
int alert_updates;
time_t batch_created;

View file

@ -583,7 +583,7 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!host->dbsync_worker)) {
error("ACLK synchronization thread is not active for host %s", host->hostname);
error("ACLK synchronization thread is not active for host %s when trying to queue removed alerts", host->hostname);
return;
}

View file

@ -6,12 +6,11 @@
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#include "../../aclk/aclk_charts_api.h"
#include "../../aclk/aclk.h"
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc,
void *data, enum aclk_database_opcode opcode)
{
int rc;
if (unlikely(!wc))
return 1;
@ -19,8 +18,8 @@ static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = opcode;
cmd.data = data;
aclk_database_enq_cmd(wc, &cmd);
return 0;
rc = aclk_database_enq_cmd_noblock(wc, &cmd);
return rc;
}
static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size)
@ -60,13 +59,13 @@ bind_fail:
return send_status;
}
static int aclk_add_chart_payload(char *uuid_str, uuid_t *uuid, char *claim_id, ACLK_PAYLOAD_TYPE payload_type,
void *payload, size_t payload_size)
static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size)
{
static __thread sqlite3_stmt *res_chart = NULL;
int rc;
rc = payload_sent(uuid_str, uuid, payload, payload_size);
rc = payload_sent(wc->uuid_str, uuid, payload, payload_size);
if (rc == 1)
return 0;
@ -74,7 +73,7 @@ static int aclk_add_chart_payload(char *uuid_str, uuid_t *uuid, char *claim_id,
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql,"INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
"VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", uuid_str);
"VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
rc = prepare_statement(db_meta, (char *) buffer_tostring(sql), &res_chart);
buffer_free(sql);
@ -115,20 +114,25 @@ static int aclk_add_chart_payload(char *uuid_str, uuid_t *uuid, char *claim_id,
rc = execute_insert(res_chart);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed store chart payload event, rc = %d", rc);
else {
wc->chart_payload_count++;
time_t now = now_realtime_sec();
if (wc->rotation_after > now && wc->rotation_after < now + ACLK_DATABASE_ROTATION_DELAY)
wc->rotation_after = now + ACLK_DATABASE_ROTATION_DELAY;
}
bind_fail:
if (unlikely(sqlite3_reset(res_chart) != SQLITE_OK))
error_report("Failed to reset statement in store chart payload, rc = %d", rc);
return (rc != SQLITE_DONE);
}
#endif
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc = 0;
CHECK_SQLITE_CONNECTION(db_meta);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
char *claim_id = is_agent_claimed();
RRDSET *st = cmd.data;
@ -158,14 +162,38 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
size_t size;
char *payload = generate_chart_instance_updated(&size, &chart_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc->uuid_str, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size);
rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size);
freez(payload);
chart_instance_updated_destroy(&chart_payload);
}
#else
UNUSED(wc);
UNUSED(cmd);
#endif
return rc;
}
static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
const char *dim_id, const char *dim_name, const char *chart_name, time_t first_time, time_t last_time)
{
int rc = 0;
size_t size;
if (unlikely(!dim_uuid || !dim_id || !dim_name || !chart_name))
return 0;
struct chart_dimension_updated dim_payload;
memset(&dim_payload, 0, sizeof(dim_payload));
if (!first_time)
info("DEBUG: Deleting dimension [%s] [%s] [%s] [%s] [%s]", wc->node_id, claim_id, dim_id, dim_name, chart_name);
dim_payload.node_id = wc->node_id;
dim_payload.claim_id = claim_id;
dim_payload.name = dim_name;
dim_payload.id = dim_id;
dim_payload.chart_id = chart_name;
dim_payload.created_at.tv_sec = first_time;
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size);
freez(payload);
return rc;
}
@ -174,7 +202,6 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
int rc = 0;
CHECK_SQLITE_CONNECTION(db_meta);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
char *claim_id = is_agent_claimed();
RRDDIM *rd = cmd.data;
@ -187,37 +214,25 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
int live = ((now - last_t) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
struct chart_dimension_updated dim_payload;
size_t size;
rc = aclk_upd_dimension_event(
wc,
claim_id,
&rd->state->metric_uuid,
rd->id,
rd->name,
rd->rrdset->id,
first_t,
live ? 0 : last_t);
memset(&dim_payload, 0, sizeof(dim_payload));
dim_payload.node_id = wc->node_id;
dim_payload.claim_id = claim_id;
dim_payload.name = rd->name;
dim_payload.id = rd->id;
dim_payload.chart_id = rd->rrdset->name;
dim_payload.created_at.tv_sec = first_t;
if (unlikely(!live))
dim_payload.last_timestamp.tv_sec = last_t;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc->uuid_str, &rd->state->metric_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size);
freez(payload);
freez(claim_id);
}
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
#else
UNUSED(wc);
UNUSED(cmd);
#endif
return rc;
}
void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
int rc;
wc->chart_pending = 0;
@ -269,6 +284,8 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
int loop = cmd.param1;
uint64_t start_sequence_id = wc->chart_sequence_id;
while (loop > 0) {
uint64_t previous_sequence_id = wc->chart_sequence_id;
int count = 0;
@ -294,6 +311,8 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
previous_sequence_id = last_sequence;
is_dim[count] = sqlite3_column_int(res, 3) > 0;
count++;
if (wc->chart_payload_count)
wc->chart_payload_count--;
}
freez(payload_list[count]);
payload_list_max_size[count] = 0;
@ -325,9 +344,22 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
wc->chart_sequence_id = last_sequence;
wc->chart_timestamp = last_timestamp;
}
else
break;
--loop;
}
if (start_sequence_id != wc->chart_sequence_id) {
time_t now = now_realtime_sec();
if (wc->rotation_after > now && wc->rotation_after < now + ACLK_DATABASE_ROTATION_DELAY)
wc->rotation_after = now + ACLK_DATABASE_ROTATION_DELAY;
}
else {
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
info("%s: sync of charts and dimensions done in %ld seconds", wc->host->hostname, now_realtime_sec() - wc->startup_time);
}
for (int i = 0; i <= limit; ++i)
freez(payload_list[i]);
@ -344,19 +376,13 @@ bind_fail:
buffer_free(sql);
freez(claim_id);
#else
UNUSED(wc);
UNUSED(cmd);
#endif
return;
}
// Push one chart config to the cloud
int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(wc);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
CHECK_SQLITE_CONNECTION(db_meta);
@ -419,12 +445,9 @@ int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_d
freez((char *) cmd.data_param);
buffer_free(sql);
return rc;
#else
UNUSED(cmd);
return 0;
#endif
}
void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc;
@ -460,7 +483,6 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql, "UPDATE aclk_chart_%s SET status = NULL, date_submitted = NULL WHERE sequence_id >= %"PRIu64";",
wc->uuid_str, cmd.param1);
@ -479,35 +501,33 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
db_unlock();
wc->chart_sequence_id = 0;
wc->chart_timestamp = 0;
wc->chart_payload_count = 0;
RRDHOST *host = wc->host;
rrdhost_rdlock(host);
RRDSET *st;
rrdset_foreach_read(st, host) {
rrdset_rdlock(st);
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
rd->state->aclk_live_status = (rd->state->aclk_live_status == 0);
}
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
rrdset_unlock(st);
}
rrdhost_unlock(host);
}
else {
//sql_chart_deduplicate(wc, cmd);
sql_get_last_chart_sequence(wc, cmd);
info("Restarting chart sync for %s from sequence=%"PRIu64, wc->uuid_str, cmd.param1);
wc->chart_payload_count = sql_get_pending_count(wc);
sql_get_last_chart_sequence(wc);
}
buffer_free(sql);
wc->chart_updates = 1;
#else
UNUSED(wc);
UNUSED(cmd);
#endif
return;
}
//
// Functions called directly from ACLK threads and will queue commands
//
@ -568,118 +588,282 @@ void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id)
return;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_reset_chart_event(char *node_id, uint64_t last_sequence_id)
// Start streaming charts / dimensions for node_id
void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at, uint64_t batch_id)
{
UNUSED(created_at);
if (unlikely(!node_id))
return;
debug(D_ACLK_SYNC, "NODE %s wants to resync from %"PRIu64, node_id, last_sequence_id);
aclk_submit_param_command(node_id, ACLK_DATABASE_RESET_CHART, last_sequence_id);
return;
}
#endif
debug(D_ACLK_SYNC,"START streaming charts for node %s from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id,
sequence_id, created_at, batch_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
// ST is read locked
int sql_queue_chart_to_aclk(RRDSET *st)
{
#ifdef ENABLE_ACLK
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch)
#endif
{
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
aclk_update_chart(st->rrdhost, st->id, 1);
return 0;
struct aclk_database_worker_config *wc = NULL;
rrd_wrlock();
RRDHOST *host = localhost;
while(host) {
if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
rrd_unlock();
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
if (likely(wc)) {
wc->chart_reset_count++;
__sync_synchronize();
wc->chart_updates = 0;
wc->batch_id = batch_id;
__sync_synchronize();
wc->batch_created = now_realtime_sec();
if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) {
info("Requesting full resync from the cloud for node id %s "
"(reset=%d, remote_seq=%"PRIu64", local_seq=%"PRIu64")"
, wc->node_id, wc->chart_reset_count, sequence_id, wc->chart_sequence_id);
chart_reset_t chart_reset;
chart_reset.claim_id = is_agent_claimed();
if (chart_reset.claim_id) {
chart_reset.node_id = node_id;
chart_reset.reason = SEQ_ID_NOT_EXISTS;
aclk_chart_reset(chart_reset);
freez(chart_reset.claim_id);
wc->chart_reset_count = -1;
}
return;
} else {
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
// TODO: handle timestamp
if (sequence_id < wc->chart_sequence_id || !sequence_id) { // || created_at != wc->chart_timestamp) {
info("RESET streaming charts for %s from sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
cmd.opcode = ACLK_DATABASE_RESET_CHART;
cmd.param1 = sequence_id + 1;
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
}
else {
info("START streaming charts for %s enabled -- last streamed sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
}
}
}
else
error("ACLK synchronization thread is not active for host %s", host->hostname);
return;
}
host = host->next;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker,
st, ACLK_DATABASE_ADD_CHART);
#else
return 0;
#endif
#else
UNUSED(st);
return 0;
#endif
}
int sql_queue_dimension_to_aclk(RRDDIM *rd)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch)
return 0;
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
rd, ACLK_DATABASE_ADD_DIMENSION);
if (likely(!rc))
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
return rc;
#else
UNUSED(rd);
return 0;
#endif
}
void sql_chart_deduplicate(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
BUFFER *sql = buffer_create(1024);
db_lock();
buffer_sprintf(sql, "DROP TABLE IF EXISTS t_%s;", wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "CREATE TABLE t_%s AS SELECT * FROM aclk_chart_payload_%s WHERE unique_id IN "
"(SELECT unique_id from aclk_chart_%s WHERE date_submitted IS NULL AND update_count > 0);",
wc->uuid_str, wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s WHERE unique_id IN (SELECT unique_id FROM t_%s); " ,
wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "DELETE FROM aclk_chart_%s WHERE unique_id IN (SELECT unique_id FROM t_%s);",
wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "DELETE FROM aclk_chart_latest_%s WHERE unique_id IN (SELECT unique_id FROM t_%s);",
wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "INSERT INTO aclk_chart_payload_%s SELECT * FROM t_%s ORDER BY DATE_CREATED ASC;",
wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "INSERT OR REPLACE INTO aclk_chart_latest_%s (uuid, unique_id, date_submitted) "
"SELECT uuid, unique_id, date_submitted FROM aclk_chart_%s where sequence_id IN "
"(SELECT sequence_id FROM aclk_chart_%s WHERE date_submitted IS NOT NULL "
"GROUP BY uuid HAVING sequence_id = MAX(sequence_id));"
, wc->uuid_str, wc->uuid_str, wc->uuid_str);
db_execute(buffer_tostring(sql));
buffer_flush(sql);
buffer_sprintf(sql, "DROP TABLE IF EXISTS t_%s;", wc->uuid_str);
db_execute(buffer_tostring(sql));
db_unlock();
sql_get_last_chart_sequence(wc, cmd);
buffer_free(sql);
rrd_unlock();
return;
}
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
#define SQL_SELECT_HOST_MEMORY_MODE "SELECT memory_mode FROM chart WHERE host_id = @host_id LIMIT 1;"
static RRD_MEMORY_MODE sql_get_host_memory_mode(uuid_t *host_id)
{
int rc;
RRD_MEMORY_MODE memory_mode = RRD_MEMORY_MODE_RAM;
sqlite3_stmt *res = NULL;
rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_MEMORY_MODE, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to read host memory mode");
return memory_mode;
}
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host parameter to fetch host memory mode");
goto failed;
}
while (sqlite3_step(res) == SQLITE_ROW) {
memory_mode = (RRD_MEMORY_MODE) sqlite3_column_int(res, 0);
}
failed:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize the prepared statement when reading host memory mode");
return memory_mode;
}
#define SELECT_HOST_DIMENSION_LIST "SELECT d.dim_id, c.update_every, c.type||'.'||c.id, d.id, d.name FROM chart c, dimension d " \
"WHERE d.chart_id = c.chart_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
#define SELECT_HOST_CHART_LIST "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
"WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
return;
char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;
sqlite3_stmt *res = NULL;
RRD_MEMORY_MODE memory_mode;
uuid_t host_uuid;
rc = uuid_parse(wc->host_guid, host_uuid);
if (unlikely(rc)) {
freez(claim_id);
return;
}
if (wc->host)
memory_mode = wc->host->rrd_memory_mode;
else
memory_mode = sql_get_host_memory_mode(&host_uuid);
if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_DIMENSION_LIST, -1, &res, 0);
else
rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_CHART_LIST, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to fetch host dimensions");
freez(claim_id);
return;
}
rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host parameter to fetch host dimensions");
goto failed;
}
time_t start_time = LONG_MAX;
time_t first_entry_t;
time_t last_entry_t;
uint32_t update_every = 0;
struct retention_updated rotate_data;
memset(&rotate_data, 0, sizeof(rotate_data));
int max_intervals = 32;
rotate_data.interval_duration_count = 0;
rotate_data.interval_durations = callocz(max_intervals, sizeof(*rotate_data.interval_durations));
now_realtime_timeval(&rotate_data.rotation_timestamp);
rotate_data.memory_mode = memory_mode;
rotate_data.claim_id = claim_id;
rotate_data.node_id = strdupz(wc->node_id);
// time_t now = now_realtime_sec();
while (sqlite3_step(res) == SQLITE_ROW) {
if (!update_every || update_every != (uint32_t) sqlite3_column_int(res, 1)) {
if (update_every) {
debug(D_ACLK_SYNC,"Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time;
rotate_data.interval_duration_count++;
}
update_every = (uint32_t) sqlite3_column_int(res, 1);
rotate_data.interval_durations[rotate_data.interval_duration_count].update_every = update_every;
start_time = LONG_MAX;
}
#ifdef ENABLE_DBENGINE
if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
rc = rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t);
else
#endif
{
if (wc->host) {
RRDSET *st = NULL;
rc = (st = rrdset_find(wc->host, (const char *)sqlite3_column_text(res, 2))) ? 0 : 1;
if (!rc) {
first_entry_t = rrdset_first_entry_t(st);
last_entry_t = rrdset_last_entry_t(st);
}
}
else {
rc = 0;
first_entry_t = rotate_data.rotation_timestamp.tv_sec;
}
}
if (likely(!rc && first_entry_t))
start_time = MIN(start_time, first_entry_t);
// if (memory_mode == RRD_MEMORY_MODE_DBENGINE) {
// int live = ((now - last_entry_t) < (RRDSET_MINIMUM_LIVE_COUNT * update_every));
// (void) aclk_upd_dimension_event(
// wc->uuid_str,
// claim_id,
// (uuid_t *)sqlite3_column_blob(res, 0),
// (const char *)(const char *)sqlite3_column_text(res, 3),
// (const char *)(const char *)sqlite3_column_text(res, 4),
// (const char *)(const char *)sqlite3_column_text(res, 2),
// first_entry_t,
// live ? 0 : last_entry_t);
// }
}
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time;
rotate_data.interval_duration_count++;
}
for (int i = 0; i < rotate_data.interval_duration_count; ++i) {
#ifdef NETDATA_INTERNAL_CHECKS
info("%d --> Update %s for %u Retention = %u", i, wc->host_guid,
rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention);
#endif
};
aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
freez(rotate_data.interval_durations);
failed:
freez(claim_id);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize the prepared statement when reading host dimensions");
return;
}
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc)
{
BUFFER *sql = buffer_create(1024);
sqlite3_stmt *res = NULL;
buffer_sprintf(sql,"SELECT count(1) FROM aclk_chart_%s ac WHERE ac.date_submitted IS NULL;", wc->uuid_str);
int rc;
uint32_t chart_payload_count = 0;
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to count pending messages");
goto fail;
}
while (sqlite3_step(res) == SQLITE_ROW)
chart_payload_count = (uint32_t) sqlite3_column_int(res, 0);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement when fetching pending messages, rc = %d", rc);
fail:
buffer_free(sql);
return chart_payload_count;
}
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
{
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql,"SELECT ac.sequence_id, ac.date_created FROM aclk_chart_%s ac " \
@ -711,76 +895,30 @@ fail:
return;
}
// Start streaming charts / dimensions for node_id
void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at, uint64_t batch_id)
int queue_dimension_to_aclk(RRDDIM *rd)
{
UNUSED(created_at);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
debug(D_ACLK_SYNC,"START streaming charts for node %s from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id,
sequence_id, created_at, batch_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
struct aclk_database_worker_config *wc = NULL;
rrd_wrlock();
RRDHOST *host = localhost;
while(host) {
if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
rrd_unlock();
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
if (likely(wc)) {
wc->chart_reset_count++;
__sync_synchronize();
wc->chart_updates = 0;
wc->batch_id = batch_id;
__sync_synchronize();
wc->batch_created = now_realtime_sec();
if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) {
debug(D_ACLK_SYNC,"Requesting full resync from the cloud -- reset_count=%d", wc->chart_reset_count);
chart_reset_t chart_reset;
chart_reset.claim_id = is_agent_claimed();
if (chart_reset.claim_id) {
chart_reset.node_id = node_id;
chart_reset.reason = SEQ_ID_NOT_EXISTS;
aclk_chart_reset(chart_reset);
freez(chart_reset.claim_id);
wc->chart_reset_count = -1;
}
return;
} else {
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
// TODO: handle timestamp
if (sequence_id < wc->chart_sequence_id) { // || created_at != wc->chart_timestamp) {
cmd.opcode = ACLK_DATABASE_RESET_CHART;
cmd.param1 = sequence_id + 1;
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
}
else {
debug(D_ACLK_SYNC,"START streaming charts for %s enabled -- last streamed sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->host_guid, wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
}
}
}
else
error("ACLK synchronization thread is not active for host %s", host->hostname);
return;
}
host = host->next;
}
rrd_unlock();
#else
UNUSED(node_id);
UNUSED(sequence_id);
UNUSED(batch_id);
#endif
return;
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
rd, ACLK_DATABASE_ADD_DIMENSION);
return rc;
}
#endif //ENABLE_NEW_CLOUD_PROTOCOL
// ST is read locked
int queue_chart_to_aclk(RRDSET *st)
{
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
aclk_update_chart(st->rrdhost, st->id, 1);
#endif
return 0;
#else
if (!aclk_use_new_cloud_arch && aclk_connected) {
aclk_update_chart(st->rrdhost, st->id, 1);
return 0;
}
return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker,
st, ACLK_DATABASE_ADD_CHART);
#endif
}

View file

@ -16,25 +16,21 @@ extern sqlite3 *db_meta;
#define RRDSET_MINIMUM_LIVE_COUNT 3
#endif
//void aclk_status_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
extern int sql_queue_chart_to_aclk(RRDSET *st);
extern int sql_queue_dimension_to_aclk(RRDDIM *rd);
extern int queue_chart_to_aclk(RRDSET *st);
extern int queue_dimension_to_aclk(RRDDIM *rd);
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
extern void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae);
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
int aclk_add_offline_dimension_event(struct aclk_database_worker_config *wc, char *node_id, char *chart_name, uuid_t *dim_uuid, char *rd_id, char *rd_name, time_t first_entry_t, time_t last_entry_t);
int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id);
void aclk_get_chart_config(char **hash_id_list);
void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_reset_chart_event(char *node_id, uint64_t last_sequence_id);
void aclk_start_streaming(char *node_id, uint64_t seq_id, time_t created_at, uint64_t batch_id);
void sql_chart_deduplicate(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void sql_check_dimension_state(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void sql_check_rotation_state(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc);
void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void sql_update_metric_statistics(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
#endif //NETDATA_SQLITE_ACLK_CHART_H

View file

@ -5,7 +5,6 @@
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#include "../../aclk/aclk_charts_api.h"
#include "../../aclk/aclk.h"
#endif
void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
@ -63,161 +62,3 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
return;
}
#define SQL_SELECT_HOST_MEMORY_MODE "select memory_mode from chart where host_id = @host_id limit 1;"
static RRD_MEMORY_MODE sql_get_host_memory_mode(uuid_t *host_id)
{
int rc;
RRD_MEMORY_MODE memory_mode = RRD_MEMORY_MODE_RAM;
sqlite3_stmt *res = NULL;
rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_MEMORY_MODE, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to read host memory mode");
return memory_mode;
}
rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host parameter to fetch host memory mode");
goto failed;
}
while (sqlite3_step(res) == SQLITE_ROW) {
memory_mode = (RRD_MEMORY_MODE) sqlite3_column_int(res, 0);
}
failed:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize the prepared statement when reading host memory mode");
return memory_mode;
}
#define SELECT_HOST_DIMENSION_LIST "SELECT d.dim_id, c.update_every, c.type||'.'||c.id FROM chart c, dimension d, host h " \
"WHERE d.chart_id = c.chart_id AND c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
#define SELECT_HOST_CHART_LIST "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
"WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
return;
char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;
sqlite3_stmt *res = NULL;
RRD_MEMORY_MODE memory_mode;
uuid_t host_uuid;
rc = uuid_parse(wc->host_guid, host_uuid);
if (unlikely(rc)) {
freez(claim_id);
return;
}
if (wc->host)
memory_mode = wc->host->rrd_memory_mode;
else
memory_mode = sql_get_host_memory_mode(&host_uuid);
if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_DIMENSION_LIST, -1, &res, 0);
else
rc = sqlite3_prepare_v2(db_meta, SELECT_HOST_CHART_LIST, -1, &res, 0);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to prepare statement to fetch host dimensions");
freez(claim_id);
return;
}
rc = sqlite3_bind_blob(res, 1, &host_uuid, sizeof(host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host parameter to fetch host dimensions");
goto failed;
}
time_t start_time = LONG_MAX;
time_t first_entry_t;
uint32_t update_every = 0;
struct retention_updated rotate_data;
memset(&rotate_data, 0, sizeof(rotate_data));
int max_intervals = 32;
rotate_data.interval_duration_count = 0;
rotate_data.interval_durations = callocz(max_intervals, sizeof(*rotate_data.interval_durations));
now_realtime_timeval(&rotate_data.rotation_timestamp);
rotate_data.memory_mode = memory_mode;
rotate_data.claim_id = claim_id;
rotate_data.node_id = strdupz(wc->node_id);
while (sqlite3_step(res) == SQLITE_ROW) {
if (!update_every || update_every != (uint32_t) sqlite3_column_int(res, 1)) {
if (update_every) {
debug(D_ACLK_SYNC,"Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time;
rotate_data.interval_duration_count++;
}
update_every = (uint32_t) sqlite3_column_int(res, 1);
rotate_data.interval_durations[rotate_data.interval_duration_count].update_every = update_every;
start_time = LONG_MAX;
}
#ifdef ENABLE_DBENGINE
time_t last_entry_t;
if (memory_mode == RRD_MEMORY_MODE_DBENGINE)
rc = rrdeng_metric_latest_time_by_uuid((uuid_t *)sqlite3_column_blob(res, 0), &first_entry_t, &last_entry_t);
else
#endif
{
if (wc->host) {
RRDSET *st = NULL;
rc = (st = rrdset_find(wc->host, (const char *)sqlite3_column_text(res, 2))) ? 0 : 1;
if (!rc)
first_entry_t = rrdset_first_entry_t(st);
}
else {
rc = 0;
first_entry_t = rotate_data.rotation_timestamp.tv_sec;
}
}
if (likely(!rc && first_entry_t))
start_time = MIN(start_time, first_entry_t);
}
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
rotate_data.interval_durations[rotate_data.interval_duration_count].retention = rotate_data.rotation_timestamp.tv_sec - start_time;
rotate_data.interval_duration_count++;
}
for (int i = 0; i < rotate_data.interval_duration_count; ++i) {
debug(D_ACLK_SYNC,"%d --> Update %s for %u Retention = %u", i, wc->host_guid,
rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention);
};
aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
freez(rotate_data.interval_durations);
failed:
freez(claim_id);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize the prepared statement when reading host dimensions");
#else
UNUSED(wc);
#endif
return;
}