mirror of
https://github.com/netdata/netdata.git
synced 2025-04-17 11:12:42 +00:00
Adjust the dimension liveness status check (#12933)
* Mark a chart to be exposed only if dimension is created or metadata changes * Add a calculate liveness for the dimension for collected to non collected (live -> stale) and vice versa * queue_dimension_to_aclk will have the rrdset and either 0 or last collected time If 0 then it will be marked as live else it will be marked as stale and last collected time will be sent to the cloud * Add an extra parameter to indicate if the payload check should be done in the database or it has been done already * Queue dimension sets dimension liveness and queues the exact payload to store in the database * Fix compilation error when --disable-cloud is specified
This commit is contained in:
parent
108bfb7918
commit
3b8d4c21e5
6 changed files with 132 additions and 65 deletions
|
@ -1274,7 +1274,9 @@ extern void rrddim_isnot_obsolete(RRDSET *st, RRDDIM *rd);
|
|||
|
||||
extern collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
|
||||
extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number value);
|
||||
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
extern time_t calc_dimension_liveness(RRDDIM *rd, time_t now);
|
||||
#endif
|
||||
extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
|
|
@ -135,15 +135,31 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) {
|
|||
}
|
||||
}
|
||||
|
||||
// Return either
|
||||
// 0 : Dimension is live
|
||||
// last collected time : Dimension is not live
|
||||
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
time_t calc_dimension_liveness(RRDDIM *rd, time_t now)
|
||||
{
|
||||
time_t last_updated = rd->last_collected_time.tv_sec;
|
||||
int live;
|
||||
if (rd->state->aclk_live_status == 1)
|
||||
live =
|
||||
((now - last_updated) <
|
||||
MIN(rrdset_free_obsolete_time, RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER * rd->update_every));
|
||||
else
|
||||
live = ((now - last_updated) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
|
||||
return live ? 0 : last_updated;
|
||||
}
|
||||
#endif
|
||||
|
||||
RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier,
|
||||
collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode)
|
||||
{
|
||||
RRDHOST *host = st->rrdhost;
|
||||
rrdset_wrlock(st);
|
||||
|
||||
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
|
||||
|
||||
RRDDIM *rd = rrddim_find(st, id);
|
||||
if(unlikely(rd)) {
|
||||
debug(D_RRD_CALLS, "Cannot create rrd dimension '%s/%s', it already exists.", st->id, name?name:"<NONAME>");
|
||||
|
@ -168,11 +184,19 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
|
|||
debug(D_METADATALOG, "DIMENSION [%s] metadata updated", rd->id);
|
||||
(void)sql_store_dimension(&rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
|
||||
rd->algorithm);
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, now_realtime_sec()));
|
||||
#endif
|
||||
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
|
||||
}
|
||||
rrdset_unlock(st);
|
||||
return rd;
|
||||
}
|
||||
|
||||
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
|
||||
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
|
||||
|
||||
char filename[FILENAME_MAX + 1];
|
||||
char fullfilename[FILENAME_MAX + 1];
|
||||
|
||||
|
|
|
@ -1489,7 +1489,7 @@ restart_after_removal:
|
|||
}
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
else
|
||||
queue_dimension_to_aclk(rd);
|
||||
queue_dimension_to_aclk(rd, rd->last_collected_time.tv_sec);
|
||||
#endif
|
||||
}
|
||||
last = rd;
|
||||
|
|
|
@ -1798,12 +1798,8 @@ after_second_database_work:
|
|||
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
if (likely(!st->state->is_ar_chart)) {
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
|
||||
int live =
|
||||
((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
|
||||
if (unlikely(live != rd->state->aclk_live_status))
|
||||
queue_dimension_to_aclk(rd);
|
||||
}
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK)))
|
||||
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
|
||||
}
|
||||
#endif
|
||||
if(unlikely(!rd->updated))
|
||||
|
@ -1906,7 +1902,7 @@ after_second_database_work:
|
|||
} else {
|
||||
/* Do not delete this dimension */
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
queue_dimension_to_aclk(rd);
|
||||
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
|
||||
#endif
|
||||
last = rd;
|
||||
rd = rd->next;
|
||||
|
|
|
@ -58,19 +58,31 @@ bind_fail:
|
|||
return send_status;
|
||||
}
|
||||
|
||||
static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
|
||||
ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, time_t *send_status)
|
||||
static int aclk_add_chart_payload(
|
||||
struct aclk_database_worker_config *wc,
|
||||
uuid_t *uuid,
|
||||
char *claim_id,
|
||||
ACLK_PAYLOAD_TYPE payload_type,
|
||||
void *payload,
|
||||
size_t payload_size,
|
||||
time_t *send_status,
|
||||
int check_sent)
|
||||
{
|
||||
static __thread sqlite3_stmt *res_chart = NULL;
|
||||
int rc;
|
||||
time_t date_submitted;
|
||||
|
||||
date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size);
|
||||
if (send_status)
|
||||
*send_status = date_submitted;
|
||||
if (date_submitted)
|
||||
if (unlikely(!payload))
|
||||
return 0;
|
||||
|
||||
if (check_sent) {
|
||||
date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size);
|
||||
if (send_status)
|
||||
*send_status = date_submitted;
|
||||
if (date_submitted)
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(!res_chart)) {
|
||||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,
|
||||
|
@ -160,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
|
|||
size_t size;
|
||||
char *payload = generate_chart_instance_updated(&size, &chart_payload);
|
||||
if (likely(payload))
|
||||
rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL);
|
||||
rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1);
|
||||
freez(payload);
|
||||
chart_instance_updated_destroy(&chart_payload);
|
||||
}
|
||||
|
@ -198,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
|
|||
dim_payload.last_timestamp.tv_sec = last_time;
|
||||
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
|
||||
if (likely(payload))
|
||||
rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status);
|
||||
rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1);
|
||||
freez(payload);
|
||||
return rc;
|
||||
}
|
||||
|
@ -272,39 +284,22 @@ bind_fail:
|
|||
|
||||
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
|
||||
{
|
||||
int rc = 0;
|
||||
int rc = 1;
|
||||
CHECK_SQLITE_CONNECTION(db_meta);
|
||||
|
||||
struct aclk_chart_dimension_data *aclk_cd_data = cmd.data;
|
||||
|
||||
char *claim_id = is_agent_claimed();
|
||||
if (!claim_id)
|
||||
goto cleanup;
|
||||
|
||||
RRDDIM *rd = cmd.data;
|
||||
rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION,
|
||||
(void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, 0);
|
||||
|
||||
if (likely(claim_id)) {
|
||||
time_t send_status = 0;
|
||||
time_t now = now_realtime_sec();
|
||||
|
||||
time_t first_t = rd->state->query_ops.oldest_time(rd);
|
||||
time_t last_t = rd->state->query_ops.latest_time(rd);
|
||||
|
||||
int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
|
||||
|
||||
rc = aclk_upd_dimension_event(
|
||||
wc,
|
||||
claim_id,
|
||||
&rd->state->metric_uuid,
|
||||
rd->id,
|
||||
rd->name,
|
||||
rd->rrdset->id,
|
||||
first_t,
|
||||
live ? 0 : last_t,
|
||||
&send_status);
|
||||
|
||||
if (!send_status)
|
||||
rd->state->aclk_live_status = live;
|
||||
|
||||
freez(claim_id);
|
||||
}
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
|
||||
freez(claim_id);
|
||||
cleanup:
|
||||
freez(aclk_cd_data->payload);
|
||||
freez(aclk_cd_data);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -1091,16 +1086,63 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc)
|
|||
return;
|
||||
}
|
||||
|
||||
void queue_dimension_to_aclk(RRDDIM *rd)
|
||||
void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated)
|
||||
{
|
||||
if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK))
|
||||
int live = !last_updated;
|
||||
|
||||
if (likely(rd->state->aclk_live_status == live))
|
||||
return;
|
||||
|
||||
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
|
||||
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
|
||||
rd, ACLK_DATABASE_ADD_DIMENSION);
|
||||
if (unlikely(rc))
|
||||
rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK);
|
||||
rd->state->aclk_live_status = live;
|
||||
|
||||
struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker;
|
||||
if (unlikely(!wc))
|
||||
return;
|
||||
|
||||
char *claim_id = is_agent_claimed();
|
||||
if (unlikely(!claim_id))
|
||||
return;
|
||||
|
||||
struct chart_dimension_updated dim_payload;
|
||||
memset(&dim_payload, 0, sizeof(dim_payload));
|
||||
dim_payload.node_id = wc->node_id;
|
||||
dim_payload.claim_id = claim_id;
|
||||
dim_payload.name = rd->name;
|
||||
dim_payload.id = rd->id;
|
||||
dim_payload.chart_id = rd->rrdset->id;
|
||||
dim_payload.created_at.tv_sec = rd->state->query_ops.oldest_time(rd);
|
||||
dim_payload.last_timestamp.tv_sec = last_updated;
|
||||
|
||||
size_t size = 0;
|
||||
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
|
||||
|
||||
freez(claim_id);
|
||||
if (unlikely(!payload))
|
||||
return;
|
||||
|
||||
time_t date_submitted = payload_sent(wc->uuid_str, &rd->state->metric_uuid, payload, size);
|
||||
if (date_submitted) {
|
||||
freez(payload);
|
||||
return;
|
||||
}
|
||||
|
||||
struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data));
|
||||
uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid);
|
||||
aclk_cd_data->payload = payload;
|
||||
aclk_cd_data->payload_size = size;
|
||||
|
||||
struct aclk_database_cmd cmd;
|
||||
memset(&cmd, 0, sizeof(cmd));
|
||||
|
||||
cmd.opcode = ACLK_DATABASE_ADD_DIMENSION;
|
||||
cmd.data = aclk_cd_data;
|
||||
int rc = aclk_database_enq_cmd_noblock(wc, &cmd);
|
||||
|
||||
if (unlikely(rc)) {
|
||||
freez(aclk_cd_data->payload);
|
||||
freez(aclk_cd_data);
|
||||
rd->state->aclk_live_status = !live;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1270,15 +1312,8 @@ void sql_check_chart_liveness(RRDSET *st) {
|
|||
|
||||
debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name);
|
||||
rrddim_foreach_read(rd, st) {
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
|
||||
int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
|
||||
if (unlikely(live != rd->state->aclk_live_status)) {
|
||||
debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
|
||||
queue_dimension_to_aclk(rd);
|
||||
}
|
||||
else
|
||||
debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
|
||||
}
|
||||
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN))
|
||||
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
|
||||
}
|
||||
rrdset_unlock(st);
|
||||
}
|
||||
|
|
|
@ -16,10 +16,20 @@ extern sqlite3 *db_meta;
|
|||
#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3)
|
||||
#endif
|
||||
|
||||
#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER
|
||||
#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30)
|
||||
#endif
|
||||
|
||||
#ifndef ACLK_MAX_DIMENSION_CLEANUP
|
||||
#define ACLK_MAX_DIMENSION_CLEANUP (500)
|
||||
#endif
|
||||
|
||||
struct aclk_chart_dimension_data {
|
||||
uuid_t uuid;
|
||||
char *payload;
|
||||
size_t payload_size;
|
||||
};
|
||||
|
||||
struct aclk_chart_sync_stats {
|
||||
int updates;
|
||||
uint64_t batch_id;
|
||||
|
@ -37,7 +47,7 @@ struct aclk_chart_sync_stats {
|
|||
};
|
||||
|
||||
extern int queue_chart_to_aclk(RRDSET *st);
|
||||
extern void queue_dimension_to_aclk(RRDDIM *rd);
|
||||
extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated);
|
||||
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
|
||||
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
|
|
Loading…
Add table
Reference in a new issue