mirror of
https://github.com/netdata/netdata.git
synced 2025-04-17 11:12:42 +00:00
Improve agent cloud chart synchronization (#12655)
* Try to queue dimension always when: Trying to clean obsolete charts If chart has been sent and liveness apparently changed * delay rotation and skip chart check if not send to cloud * No need to CLEAR flag during database rotation Do not clear chart ACLK status for dimension requests * Change payload_sent to return timestamp of submitted message * Clear the dimension ACLK flag if we are processing all the charts again * Check if dimension is already queued to ACLK and ignore it If queue fails then reset it to retry Already try to queue the dimension * Improve dimension cleanup during the retention message calculation * Change queue_dimension_to_aclk to return void * If no time range for this dimension then assume it is deleted * Start streaming for inactive nodes * Remove dead code * Correctly report hostname in the access log * Schedule a dimension deletion without trying to submit a message immediately * Enable dimension cleanup -- also delete dimension if not found in the dbengine files Free hostname
This commit is contained in:
parent
cbff54ac71
commit
154cf74d6a
6 changed files with 125 additions and 103 deletions
|
@ -194,9 +194,6 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) {
|
|||
}
|
||||
}
|
||||
}
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
}
|
||||
|
||||
RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier,
|
||||
|
@ -441,9 +438,6 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
|
|||
ml_new_dimension(rd);
|
||||
|
||||
rrdset_unlock(st);
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
return(rd);
|
||||
}
|
||||
|
||||
|
@ -516,10 +510,6 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
|
|||
freez(rd);
|
||||
break;
|
||||
}
|
||||
#ifdef ENABLE_ACLK
|
||||
if (db_rotated || RRD_MEMORY_MODE_DBENGINE != rrd_memory_mode)
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -539,9 +529,6 @@ int rrddim_hide(RRDSET *st, const char *id) {
|
|||
(void) sql_set_dimension_option(&rd->state->metric_uuid, "hidden");
|
||||
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -557,9 +544,6 @@ int rrddim_unhide(RRDSET *st, const char *id) {
|
|||
(void) sql_set_dimension_option(&rd->state->metric_uuid, NULL);
|
||||
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -572,18 +556,12 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) {
|
|||
}
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_OBSOLETE);
|
||||
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS);
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
}
|
||||
|
||||
inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) {
|
||||
debug(D_RRD_CALLS, "rrddim_isnot_obsolete() for chart %s, dimension %s", st->name, rd->name);
|
||||
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE);
|
||||
#ifdef ENABLE_ACLK
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
|
||||
#endif
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
|
|
@ -1488,9 +1488,8 @@ restart_after_removal:
|
|||
continue;
|
||||
}
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
else {
|
||||
aclk_send_dimension_update(rd);
|
||||
}
|
||||
else
|
||||
queue_dimension_to_aclk(rd);
|
||||
#endif
|
||||
}
|
||||
last = rd;
|
||||
|
|
|
@ -1368,8 +1368,9 @@ void rrdset_done(RRDSET *st) {
|
|||
#ifdef ENABLE_ACLK
|
||||
if (likely(!st->state->is_ar_chart)) {
|
||||
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st)))
|
||||
if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) {
|
||||
rrdset_flag_set(st, RRDSET_FLAG_ACLK);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -1796,20 +1797,14 @@ after_second_database_work:
|
|||
continue;
|
||||
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
if (likely(!st->state->is_ar_chart)) {
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
|
||||
int live =
|
||||
((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
|
||||
if (unlikely(live != rd->state->aclk_live_status)) {
|
||||
if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
if (likely(!queue_dimension_to_aclk(rd))) {
|
||||
rd->state->aclk_live_status = live;
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
|
||||
}
|
||||
}
|
||||
if (likely(!st->state->is_ar_chart)) {
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
int live =
|
||||
((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
|
||||
if (unlikely(live != rd->state->aclk_live_status))
|
||||
queue_dimension_to_aclk(rd);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if(unlikely(!rd->updated))
|
||||
continue;
|
||||
|
@ -1911,7 +1906,7 @@ after_second_database_work:
|
|||
} else {
|
||||
/* Do not delete this dimension */
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
aclk_send_dimension_update(rd);
|
||||
queue_dimension_to_aclk(rd);
|
||||
#endif
|
||||
last = rd;
|
||||
rd = rd->next;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#endif
|
||||
#define ACLK_MAX_ALERT_UPDATES (5)
|
||||
#define ACLK_DATABASE_CLEANUP_FIRST (60)
|
||||
#define ACLK_DATABASE_ROTATION_DELAY (60)
|
||||
#define ACLK_DATABASE_ROTATION_DELAY (180)
|
||||
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
|
||||
#define ACLK_DATABASE_ROTATION_INTERVAL (3600)
|
||||
#define ACLK_DELETE_ACK_INTERNAL (600)
|
||||
|
|
|
@ -22,20 +22,20 @@ sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum
|
|||
return rc;
|
||||
}
|
||||
|
||||
static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size)
|
||||
static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size)
|
||||
{
|
||||
static __thread sqlite3_stmt *res = NULL;
|
||||
int rc;
|
||||
int send_status = 0;
|
||||
time_t send_status = 0;
|
||||
|
||||
if (unlikely(!res)) {
|
||||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
|
||||
"WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;",
|
||||
uuid_str, uuid_str);
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp "
|
||||
"WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;",
|
||||
uuid_str, uuid_str);
|
||||
rc = prepare_statement(db_meta, sql, &res);
|
||||
if (rc != SQLITE_OK) {
|
||||
error_report("Failed to prepare statement to check payload data");
|
||||
error_report("Failed to prepare statement to check payload data on %s", sql);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl
|
|||
goto bind_fail;
|
||||
|
||||
while (sqlite3_step(res) == SQLITE_ROW) {
|
||||
send_status = sqlite3_column_int(res, 0);
|
||||
send_status = (time_t) sqlite3_column_int64(res, 0);
|
||||
}
|
||||
|
||||
bind_fail:
|
||||
|
@ -59,22 +59,23 @@ bind_fail:
|
|||
}
|
||||
|
||||
static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
|
||||
ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status)
|
||||
ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, time_t *send_status)
|
||||
{
|
||||
static __thread sqlite3_stmt *res_chart = NULL;
|
||||
int rc;
|
||||
time_t date_submitted;
|
||||
|
||||
rc = payload_sent(wc->uuid_str, uuid, payload, payload_size);
|
||||
date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size);
|
||||
if (send_status)
|
||||
*send_status = rc;
|
||||
if (rc == 1)
|
||||
*send_status = date_submitted;
|
||||
if (date_submitted)
|
||||
return 0;
|
||||
|
||||
if (unlikely(!res_chart)) {
|
||||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,
|
||||
"INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
|
||||
"VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
|
||||
"INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \
|
||||
"VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str);
|
||||
rc = prepare_statement(db_meta, sql, &res_chart);
|
||||
if (rc != SQLITE_OK) {
|
||||
error_report("Failed to prepare statement to store chart payload data");
|
||||
|
@ -168,7 +169,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
|
|||
|
||||
static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
|
||||
const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time,
|
||||
int *send_status)
|
||||
time_t *send_status)
|
||||
{
|
||||
int rc = 0;
|
||||
size_t size;
|
||||
|
@ -279,7 +280,7 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
|
|||
RRDDIM *rd = cmd.data;
|
||||
|
||||
if (likely(claim_id)) {
|
||||
int send_status = 0;
|
||||
time_t send_status = 0;
|
||||
time_t now = now_realtime_sec();
|
||||
|
||||
time_t first_t = rd->state->query_ops.oldest_time(rd);
|
||||
|
@ -337,6 +338,12 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
static __thread sqlite3_stmt *res = NULL;
|
||||
|
||||
char *hostname = NULL;
|
||||
if (wc->host)
|
||||
hostname = strdupz(wc->host->hostname);
|
||||
else
|
||||
hostname = get_hostname_by_node_id(wc->node_id);
|
||||
|
||||
if (unlikely(!res)) {
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \
|
||||
"FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \
|
||||
|
@ -419,7 +426,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
log_access(
|
||||
"ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
|
||||
wc->node_id,
|
||||
wc->host ? wc->host->hostname : "N/A",
|
||||
hostname ? hostname : "N/A",
|
||||
first_sequence,
|
||||
last_sequence,
|
||||
wc->batch_id);
|
||||
|
@ -440,7 +447,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
log_access(
|
||||
"ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.",
|
||||
wc->node_id,
|
||||
wc->host ? wc->host->hostname : "N/A",
|
||||
hostname ? hostname : "N/A",
|
||||
now_realtime_sec() - wc->startup_time);
|
||||
}
|
||||
|
||||
|
@ -459,6 +466,7 @@ bind_fail:
|
|||
error_report("Failed to reset statement when pushing chart events, rc = %d", rc);
|
||||
|
||||
freez(claim_id);
|
||||
freez(hostname);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -583,8 +591,13 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
|
|||
cmd.param1);
|
||||
db_execute(buffer_tostring(sql));
|
||||
if (cmd.param1 == 1) {
|
||||
char *hostname = NULL;
|
||||
if (wc->host)
|
||||
hostname = strdupz(wc->host->hostname);
|
||||
else
|
||||
hostname = get_hostname_by_node_id(wc->node_id);
|
||||
buffer_flush(sql);
|
||||
log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
|
||||
log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A");
|
||||
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \
|
||||
"DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
|
||||
db_lock();
|
||||
|
@ -609,6 +622,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
|
|||
RRDDIM *rd;
|
||||
rrddim_foreach_read(rd, st)
|
||||
{
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
|
||||
rd->state->aclk_live_status = (rd->state->aclk_live_status == 0);
|
||||
}
|
||||
rrdset_unlock(st);
|
||||
|
@ -616,6 +630,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
|
|||
rrdhost_unlock(host);
|
||||
} else
|
||||
error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
|
||||
freez(hostname);
|
||||
} else {
|
||||
log_access(
|
||||
"ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64,
|
||||
|
@ -705,25 +720,28 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
|
|||
if (unlikely(!node_id))
|
||||
return;
|
||||
|
||||
// log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id,
|
||||
// sequence_id, created_at, batch_id);
|
||||
|
||||
uuid_t node_uuid;
|
||||
if (uuid_parse(node_id, node_uuid)) {
|
||||
log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id);
|
||||
return;
|
||||
}
|
||||
|
||||
struct aclk_database_worker_config *wc = NULL;
|
||||
struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
|
||||
rrd_rdlock();
|
||||
RRDHOST *host = localhost;
|
||||
while(host) {
|
||||
if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
|
||||
if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) {
|
||||
rrd_unlock();
|
||||
wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
|
||||
(struct aclk_database_worker_config *)host->dbsync_worker :
|
||||
(struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
|
||||
if (!wc)
|
||||
wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
|
||||
(struct aclk_database_worker_config *)host->dbsync_worker :
|
||||
(struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
|
||||
char *hostname = NULL;
|
||||
if (likely(wc)) {
|
||||
if (wc->host)
|
||||
hostname = strdupz(wc->host->hostname);
|
||||
else
|
||||
hostname = get_hostname_by_node_id(node_id);
|
||||
wc->chart_reset_count++;
|
||||
__sync_synchronize();
|
||||
wc->chart_updates = 0;
|
||||
|
@ -733,7 +751,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
|
|||
log_access(
|
||||
"ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d",
|
||||
wc->node_id,
|
||||
wc->host ? wc->host->hostname : "N/A",
|
||||
hostname ? hostname : "N/A",
|
||||
wc->chart_sequence_id,
|
||||
wc->chart_timestamp,
|
||||
wc->chart_reset_count);
|
||||
|
@ -742,7 +760,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
|
|||
"ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST "
|
||||
"remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ",
|
||||
wc->node_id,
|
||||
wc->host ? wc->host->hostname : "N/A",
|
||||
hostname ? hostname : "N/A",
|
||||
sequence_id,
|
||||
wc->chart_sequence_id,
|
||||
wc->chart_reset_count);
|
||||
|
@ -766,7 +784,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
|
|||
log_access(
|
||||
"ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64,
|
||||
wc->node_id,
|
||||
wc->host ? wc->host->hostname : "N/A",
|
||||
hostname ? hostname : "N/A",
|
||||
wc->chart_sequence_id,
|
||||
wc->chart_timestamp,
|
||||
wc->batch_id);
|
||||
|
@ -775,20 +793,16 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
|
|||
cmd.completion = NULL;
|
||||
aclk_database_enq_cmd(wc, &cmd);
|
||||
} else {
|
||||
// log_access(
|
||||
// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64
|
||||
// " t=%ld resets=%d",
|
||||
// wc->node_id,
|
||||
// wc->host ? wc->host->hostname : "N/A",
|
||||
// wc->chart_sequence_id,
|
||||
// wc->chart_timestamp,
|
||||
// wc->chart_reset_count);
|
||||
wc->chart_reset_count = 0;
|
||||
wc->chart_updates = 1;
|
||||
}
|
||||
}
|
||||
} else
|
||||
log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
|
||||
freez(hostname);
|
||||
} else {
|
||||
hostname = get_hostname_by_node_id(node_id);
|
||||
log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A");
|
||||
freez(hostname);
|
||||
}
|
||||
return;
|
||||
}
|
||||
host = host->next;
|
||||
|
@ -887,7 +901,10 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
time_t last_entry_t;
|
||||
uint32_t update_every = 0;
|
||||
uint32_t dimension_update_count = 0;
|
||||
int send_status;
|
||||
uint32_t total_checked = 0;
|
||||
uint32_t total_deleted= 0;
|
||||
uint32_t total_stopped= 0;
|
||||
time_t send_status;
|
||||
|
||||
struct retention_updated rotate_data;
|
||||
|
||||
|
@ -942,23 +959,40 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
if (likely(!rc && first_entry_t))
|
||||
start_time = MIN(start_time, first_entry_t);
|
||||
|
||||
if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) {
|
||||
if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
|
||||
int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every));
|
||||
if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
|
||||
(void)aclk_upd_dimension_event(
|
||||
wc,
|
||||
claim_id,
|
||||
(uuid_t *)sqlite3_column_blob(res, 0),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 3),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 4),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 2),
|
||||
first_entry_t,
|
||||
live ? 0 : last_entry_t,
|
||||
&send_status);
|
||||
if (!send_status)
|
||||
if (rc) {
|
||||
first_entry_t = 0;
|
||||
last_entry_t = 0;
|
||||
live = 0;
|
||||
}
|
||||
if (!wc->host || !first_entry_t) {
|
||||
if (!first_entry_t) {
|
||||
delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
|
||||
total_deleted++;
|
||||
dimension_update_count++;
|
||||
}
|
||||
else {
|
||||
(void)aclk_upd_dimension_event(
|
||||
wc,
|
||||
claim_id,
|
||||
(uuid_t *)sqlite3_column_blob(res, 0),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 3),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 4),
|
||||
(const char *)(const char *)sqlite3_column_text(res, 2),
|
||||
first_entry_t,
|
||||
live ? 0 : last_entry_t,
|
||||
&send_status);
|
||||
|
||||
if (!send_status) {
|
||||
if (last_entry_t)
|
||||
total_stopped++;
|
||||
dimension_update_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
total_checked++;
|
||||
}
|
||||
if (update_every) {
|
||||
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
|
||||
|
@ -970,7 +1004,16 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
rotate_data.interval_duration_count++;
|
||||
}
|
||||
|
||||
char *hostname = NULL;
|
||||
if (!wc->host)
|
||||
hostname = get_hostname_by_node_id(wc->node_id);
|
||||
|
||||
log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
|
||||
wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
|
||||
freez(hostname);
|
||||
|
||||
#ifdef NETDATA_INTERNAL_CHECKS
|
||||
info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates);
|
||||
for (int i = 0; i < rotate_data.interval_duration_count; ++i)
|
||||
info(
|
||||
"Update for host %s (node %s) for %u Retention = %u",
|
||||
|
@ -1048,11 +1091,17 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
|
|||
return;
|
||||
}
|
||||
|
||||
int queue_dimension_to_aclk(RRDDIM *rd)
|
||||
void queue_dimension_to_aclk(RRDDIM *rd)
|
||||
{
|
||||
if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK))
|
||||
return;
|
||||
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
|
||||
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
|
||||
rd, ACLK_DATABASE_ADD_DIMENSION);
|
||||
return rc;
|
||||
if (unlikely(rc))
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
|
||||
return;
|
||||
}
|
||||
|
||||
void aclk_send_dimension_update(RRDDIM *rd)
|
||||
|
@ -1203,6 +1252,12 @@ void sql_check_chart_liveness(RRDSET *st) {
|
|||
return;
|
||||
|
||||
rrdset_rdlock(st);
|
||||
|
||||
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
rrdset_unlock(st);
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) {
|
||||
debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name);
|
||||
|
@ -1218,13 +1273,8 @@ void sql_check_chart_liveness(RRDSET *st) {
|
|||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
|
||||
int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
|
||||
if (unlikely(live != rd->state->aclk_live_status)) {
|
||||
if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
if (likely(!queue_dimension_to_aclk(rd))) {
|
||||
debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
|
||||
rd->state->aclk_live_status = live;
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
|
||||
}
|
||||
}
|
||||
debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
|
||||
queue_dimension_to_aclk(rd);
|
||||
}
|
||||
else
|
||||
debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
|
||||
|
|
|
@ -37,7 +37,7 @@ struct aclk_chart_sync_stats {
|
|||
};
|
||||
|
||||
extern int queue_chart_to_aclk(RRDSET *st);
|
||||
extern int queue_dimension_to_aclk(RRDDIM *rd);
|
||||
extern void queue_dimension_to_aclk(RRDDIM *rd);
|
||||
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
|
||||
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
|
|
Loading…
Add table
Reference in a new issue