0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 11:12:42 +00:00

Store and submit dimension delete messages for new cloud architecture ()

* Enhance the dimension delete table and adjust the trigger to include chart_id and host_id
* Add the aclk_process_dimension_deletion function
* Change variable chart_name in aclk_upd_dimension_event (it is st->id from st.type dot st.id)

* Process dimension deletion when retention updates are sent

* Do not send charts if we don't have dimensions

* Add check for uuid_parse return code
This commit is contained in:
Stelios Fragkakis 2021-11-09 21:25:04 +02:00 committed by GitHub
parent f44dd56681
commit a2852377d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 5 deletions

View file

@ -1392,7 +1392,7 @@ void rrdset_done(RRDSET *st) {
#ifdef ENABLE_ACLK
if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT) {
if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT && st->dimensions) {
if (likely(!queue_chart_to_aclk(st)))
rrdset_flag_set(st, RRDSET_FLAG_ACLK);
}

View file

@ -11,6 +11,15 @@
#endif
const char *aclk_sync_config[] = {
"CREATE TABLE IF NOT EXISTS dimension_delete (dimension_id blob, dimension_name text, chart_type_id text, "
"dim_id blob, chart_id blob, host_id blob, date_created);",
"CREATE INDEX IF NOT EXISTS ind_h1 ON dimension_delete (host_id);",
"CREATE TRIGGER IF NOT EXISTS tr_dim_del AFTER DELETE ON dimension BEGIN INSERT INTO dimension_delete "
"(dimension_id, dimension_name, chart_type_id, dim_id, chart_id, host_id, date_created)"
" select old.id, old.name, c.type||\".\"||c.id, old.dim_id, old.chart_id, c.host_id, strftime('%s') FROM"
" chart c WHERE c.chart_id = old.chart_id; END;",
NULL,
};
@ -446,10 +455,12 @@ void aclk_database_worker(void *arg)
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
case ACLK_DATABASE_DIM_DELETION:
debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str);
aclk_process_dimension_deletion(wc, cmd);
break;
case ACLK_DATABASE_UPD_RETENTION:
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
aclk_update_retention(wc, cmd);
aclk_process_dimension_deletion(wc, cmd);
break;
#endif

View file

@ -170,24 +170,28 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
}
static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
const char *dim_id, const char *dim_name, const char *chart_name, time_t first_time, time_t last_time)
const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time)
{
int rc = 0;
size_t size;
if (unlikely(!dim_uuid || !dim_id || !dim_name || !chart_name))
if (unlikely(!dim_uuid || !dim_id || !dim_name || !chart_type_id))
return 0;
struct chart_dimension_updated dim_payload;
memset(&dim_payload, 0, sizeof(dim_payload));
#ifdef NETDATA_INTERNAL_CHECKS
if (!first_time)
info("DEBUG: Deleting dimension [%s] [%s] [%s] [%s] [%s]", wc->node_id, claim_id, dim_id, dim_name, chart_name);
info("Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]",
wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id);
#endif
dim_payload.node_id = wc->node_id;
dim_payload.claim_id = claim_id;
dim_payload.name = dim_name;
dim_payload.id = dim_id;
dim_payload.chart_id = chart_name;
dim_payload.chart_id = chart_type_id;
dim_payload.created_at.tv_sec = first_time;
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
@ -197,6 +201,67 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w
return rc;
}
void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc = 0;
sqlite3_stmt *res = NULL;
if (!aclk_use_new_cloud_arch || !aclk_connected)
return;
if (unlikely(!db_meta))
return;
uuid_t host_id;
if (uuid_parse(wc->host_guid, host_id))
return;
char *claim_id = is_agent_claimed();
if (!claim_id)
return;
rc = sqlite3_prepare_v2(db_meta, "DELETE FROM dimension_delete where host_id = @host_id " \
"RETURNING dimension_id, dimension_name, chart_type_id, dim_id LIMIT 10;", -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to delete dimension deletes");
freez(claim_id);
return;
}
rc = sqlite3_bind_blob(res, 1, &host_id , sizeof(host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
unsigned count = 0;
while (sqlite3_step(res) == SQLITE_ROW) {
(void) aclk_upd_dimension_event(
wc,
claim_id,
(uuid_t *)sqlite3_column_text(res, 3),
(const char *)sqlite3_column_text(res, 0),
(const char *)sqlite3_column_text(res, 1),
(const char *)sqlite3_column_text(res, 2),
0,
0);
count++;
}
if (count) {
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_DIM_DELETION;
if (aclk_database_enq_cmd_noblock(wc, &cmd))
info("Failed to queue a dimension deletion message");
}
bind_fail:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement when adding dimension deletion events, rc = %d", rc);
freez(claim_id);
return;
}
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
int rc = 0;

View file

@ -32,5 +32,6 @@ void sql_check_rotation_state(struct aclk_database_worker_config *wc, struct acl
void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc);
void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
#endif //NETDATA_SQLITE_ACLK_CHART_H