0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-05-17 14:42:21 +00:00

Enable additional functionality for the new cloud architecture ()

This commit is contained in:
Stelios Fragkakis 2021-10-06 20:55:31 +03:00 committed by GitHub
parent af93cc31ed
commit 12f16063f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 721 additions and 204 deletions
database/sqlite

View file

@ -3,8 +3,12 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_chart.h"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#include "../../aclk/aclk_charts_api.h"
#include "../../aclk/aclk.h"
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc,
void *data, enum aclk_database_opcode opcode)
{
@ -117,6 +121,7 @@ bind_fail:
error_report("Failed to reset statement in store chart payload, rc = %d", rc);
return (rc != SQLITE_DONE);
}
#endif
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
@ -134,8 +139,8 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
chart_payload.config_hash = get_str_from_uuid(&st->state->hash_id);
chart_payload.update_every = st->update_every;
chart_payload.memory_mode = st->rrd_memory_mode;
chart_payload.name = strdupz((char *)st->name);
chart_payload.node_id = strdupz(wc->node_id);
chart_payload.name = (char *)st->name;
chart_payload.node_id = wc->node_id;
chart_payload.claim_id = claim_id;
chart_payload.id = strdupz(st->id);
@ -186,12 +191,12 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
size_t size;
memset(&dim_payload, 0, sizeof(dim_payload));
dim_payload.node_id = strdupz(wc->node_id);
dim_payload.node_id = wc->node_id;
dim_payload.claim_id = claim_id;
dim_payload.name = strdupz(rd->name);
dim_payload.id = strdupz(rd->id);
dim_payload.name = rd->name;
dim_payload.id = rd->id;
dim_payload.chart_id = strdupz(rd->rrdset->name);
dim_payload.chart_id = rd->rrdset->name;
dim_payload.created_at.tv_sec = first_t;
if (unlikely(!live))
dim_payload.last_timestamp.tv_sec = last_t;
@ -199,10 +204,6 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
rc = aclk_add_chart_payload(wc->uuid_str, &rd->state->metric_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size);
freez((char *)dim_payload.node_id);
freez((char *)dim_payload.chart_id);
freez((char *)dim_payload.name);
freez((char *)dim_payload.id);
freez(payload);
freez(claim_id);
}
@ -229,6 +230,10 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
if (unlikely(!claim_id))
return;
uuid_t claim_uuid;
if (uuid_parse(claim_id, claim_uuid))
return;
int limit = cmd.count > 0 ? cmd.count : 1;
uint64_t first_sequence;
@ -252,7 +257,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
return;
}
rc = sqlite3_bind_text(res, 1, claim_id , -1, SQLITE_STATIC);
rc = sqlite3_bind_blob(res, 1, claim_uuid , sizeof(claim_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@ -404,7 +409,7 @@ int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_d
destroy_chart_config_updated(&chart_config);
}
else
info("DEBUG: Chart config for %s not found", hash_id);
info("Chart config for %s not found", hash_id);
bind_fail:
rc = sqlite3_finalize(res);
@ -455,6 +460,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql, "UPDATE aclk_chart_%s SET status = NULL, date_submitted = NULL WHERE sequence_id >= %"PRIu64";",
wc->uuid_str, cmd.param1);
@ -462,15 +468,18 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
if (cmd.param1 == 1) {
db_lock();
buffer_flush(sql);
info("DEBUG: Deleting all data for %s", wc->uuid_str);
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; DELETE FROM aclk_chart_latest_%s;",
wc->uuid_str, wc->uuid_str, wc->uuid_str);
info("Received full resync for %s", wc->uuid_str);
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \
"DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
db_execute("BEGIN TRANSACTION;");
db_execute(buffer_tostring(sql));
db_execute("COMMIT TRANSACTION;");
db_unlock();
wc->chart_sequence_id = 0;
wc->chart_timestamp = 0;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
RRDHOST *host = wc->host;
rrdhost_rdlock(host);
RRDSET *st;
@ -484,7 +493,6 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
rrdset_unlock(st);
}
rrdhost_unlock(host);
#endif
}
else {
//sql_chart_deduplicate(wc, cmd);
@ -492,6 +500,11 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
}
buffer_free(sql);
wc->chart_updates = 1;
#else
UNUSED(wc);
UNUSED(cmd);
#endif
return;
}
@ -555,6 +568,7 @@ void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id)
return;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_reset_chart_event(char *node_id, uint64_t last_sequence_id)
{
if (unlikely(!node_id))
@ -564,21 +578,47 @@ void aclk_reset_chart_event(char *node_id, uint64_t last_sequence_id)
aclk_submit_param_command(node_id, ACLK_DATABASE_RESET_CHART, last_sequence_id);
return;
}
#endif
// ST is read locked
int sql_queue_chart_to_aclk(RRDSET *st)
{
#ifdef ENABLE_ACLK
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch)
#endif
{
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
aclk_update_chart(st->rrdhost, st->id, 1);
return 0;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker,
st, ACLK_DATABASE_ADD_CHART);
#else
return 0;
#endif
#else
UNUSED(st);
return 0;
#endif
}
int sql_queue_dimension_to_aclk(RRDDIM *rd)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch)
return 0;
int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker,
rd, ACLK_DATABASE_ADD_DIMENSION);
if (likely(!rc))
rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
return rc;
#else
UNUSED(rd);
return 0;
#endif
}
void sql_chart_deduplicate(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
@ -674,6 +714,7 @@ fail:
// Start streaming charts / dimensions for node_id
void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at, uint64_t batch_id)
{
UNUSED(created_at);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
@ -692,52 +733,41 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
rrd_unlock();
wc = (struct aclk_database_worker_config *)host->dbsync_worker;
if (likely(wc)) {
// if (unlikely(!wc->chart_updates)) {
// struct aclk_database_cmd cmd;
// cmd.opcode = ACLK_DATABASE_NODE_INFO;
// cmd.completion = NULL;
// aclk_database_enq_cmd(wc, &cmd);
// }
wc->chart_reset_count++;
__sync_synchronize();
wc->chart_updates = 0;
wc->batch_id = batch_id;
__sync_synchronize();
wc->batch_created = now_realtime_sec();
info("DEBUG: START streaming charts for %s (%s) enabled -- last streamed sequence %"PRIu64" t=%ld (reset count=%d)", node_id, wc->uuid_str,
wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count);
// If mismatch detected
if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) {
info("DEBUG: Full resync requested -- reset_count=%d", wc->chart_reset_count);
debug(D_ACLK_SYNC,"Requesting full resync from the cloud -- reset_count=%d", wc->chart_reset_count);
chart_reset_t chart_reset;
chart_reset.node_id = strdupz(node_id);
chart_reset.claim_id = is_agent_claimed();
chart_reset.reason = SEQ_ID_NOT_EXISTS;
aclk_chart_reset(chart_reset);
// wc->chart_updates = 0;
wc->chart_reset_count = -1;
if (chart_reset.claim_id) {
chart_reset.node_id = node_id;
chart_reset.reason = SEQ_ID_NOT_EXISTS;
aclk_chart_reset(chart_reset);
freez(chart_reset.claim_id);
wc->chart_reset_count = -1;
}
return;
} else {
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
// TODO: handle timestamp
// if (!wc->chart_reset_count)
// wc->chart_delay = now_realtime_sec() + 60;
// else
// wc->chart_delay = 0;
if (sequence_id < wc->chart_sequence_id) { // || created_at != wc->chart_timestamp) {
// wc->chart_updates = 0;
if (sequence_id)
info("DEBUG: Synchonization mismatch detected");
else
info("DEBUG: Synchonization mismatch detected; full resync ACKed from the cloud");
cmd.opcode = ACLK_DATABASE_RESET_CHART;
cmd.param1 = sequence_id + 1;
cmd.completion = NULL;
aclk_database_enq_cmd(wc, &cmd);
}
else
else {
debug(D_ACLK_SYNC,"START streaming charts for %s enabled -- last streamed sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->host_guid, wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
}
}
}
else
@ -750,7 +780,6 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
#else
UNUSED(node_id);
UNUSED(sequence_id);
UNUSED(created_at);
UNUSED(batch_id);
#endif
return;