0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-30 23:50:04 +00:00

Add a checkpoint message to alerts stream ()

* pull aclk schemas

* resolve capas

* handle checkpoints and removed from health

* build with disable-cloud

* codacy 1

* misc changes

* one more char in hash

* free buffer

* change topic

* misc fixes

* skip removed alert variables

* change hash functions

* use create and destroy for compatibility with older openssl
This commit is contained in:
Emmanuel Vasilakis 2023-04-21 12:24:43 +03:00 committed by GitHub
parent 8d953cf206
commit 0d2c327ae5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 344 additions and 365 deletions

@ -1 +1 @@
Subproject commit 3252118bd547640251356629f0df05eaf952ac39 Subproject commit d3a5c636b6dacf364834f2ba99ce0170c71ef861

View file

@ -49,8 +49,6 @@ float last_backoff_value = 0;
time_t aclk_block_until = 0; time_t aclk_block_until = 0;
int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
#ifdef ENABLE_ACLK #ifdef ENABLE_ACLK
mqtt_wss_client mqttwss_client; mqtt_wss_client mqttwss_client;
@ -928,14 +926,10 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
} }
buffer_sprintf(wb, buffer_sprintf(wb,
"\n\t\tUpdates: %d" "\n\t\tUpdates: %d"
"\n\t\tBatch ID: %"PRIu64
"\n\t\tLast Acked Seq ID: %"PRIu64
"\n\t\tPending Min Seq ID: %"PRIu64 "\n\t\tPending Min Seq ID: %"PRIu64
"\n\t\tPending Max Seq ID: %"PRIu64 "\n\t\tPending Max Seq ID: %"PRIu64
"\n\t\tLast Submitted Seq ID: %"PRIu64, "\n\t\tLast Submitted Seq ID: %"PRIu64,
status.alert_updates, status.alert_updates,
status.alerts_batch_id,
status.last_acked_sequence_id,
status.pending_min_sequence_id, status.pending_min_sequence_id,
status.pending_max_sequence_id, status.pending_max_sequence_id,
status.last_submitted_sequence_id status.last_submitted_sequence_id
@ -1043,12 +1037,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
json_object *tmp = json_object_new_int(status.alert_updates); json_object *tmp = json_object_new_int(status.alert_updates);
json_object_object_add(obj, "updates", tmp); json_object_object_add(obj, "updates", tmp);
tmp = json_object_new_int(status.alerts_batch_id);
json_object_object_add(obj, "batch-id", tmp);
tmp = json_object_new_int(status.last_acked_sequence_id);
json_object_object_add(obj, "last-acked-seq-id", tmp);
tmp = json_object_new_int(status.pending_min_sequence_id); tmp = json_object_new_int(status.pending_min_sequence_id);
json_object_object_add(obj, "pending-min-seq-id", tmp); json_object_object_add(obj, "pending-min-seq-id", tmp);

View file

@ -26,8 +26,6 @@ extern time_t aclk_block_until;
extern int disconnect_req; extern int disconnect_req;
extern int aclk_alert_reloaded;
#ifdef ENABLE_ACLK #ifdef ENABLE_ACLK
void *aclk_main(void *ptr); void *aclk_main(void *ptr);

View file

@ -8,12 +8,12 @@
#include "aclk.h" #include "aclk.h"
void aclk_send_alarm_log_health(struct alarm_log_health *log_health) void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint)
{ {
aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CHECKPOINT);
query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); query->data.bin_payload.payload = generate_alarm_checkpoint(&query->data.bin_payload.size, checkpoint);
query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CHECKPOINT;
query->data.bin_payload.msg_name = "AlarmLogHealth"; query->data.bin_payload.msg_name = "AlarmCheckpoint";
QUEUE_IF_PAYLOAD_PRESENT(query); QUEUE_IF_PAYLOAD_PRESENT(query);
} }

View file

@ -6,7 +6,7 @@
#include "../daemon/common.h" #include "../daemon/common.h"
#include "schema-wrappers/schema_wrappers.h" #include "schema-wrappers/schema_wrappers.h"
void aclk_send_alarm_log_health(struct alarm_log_health *log_health); void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint);
void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry); void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry);
void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg); void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg);
void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot); void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot);

View file

@ -14,6 +14,7 @@ const struct capability *aclk_get_agent_capas()
{ .name = "ctx", .version = 1, .enabled = 1 }, { .name = "ctx", .version = 1, .enabled = 1 },
{ .name = "funcs", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 1, .enabled = 1 },
{ .name = "http_api_v2", .version = 1, .enabled = 1 }, { .name = "http_api_v2", .version = 1, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = 0 },
{ .name = NULL, .version = 0, .enabled = 0 } { .name = NULL, .version = 0, .enabled = 0 }
}; };
agent_capabilities[2].version = ml_capable() ? 1 : 0; agent_capabilities[2].version = ml_capable() ? 1 : 0;
@ -22,6 +23,8 @@ const struct capability *aclk_get_agent_capas()
agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
agent_capabilities[3].enabled = enable_metric_correlations; agent_capabilities[3].enabled = enable_metric_correlations;
agent_capabilities[7].enabled = localhost->health.health_enabled;
return agent_capabilities; return agent_capabilities;
} }
@ -36,6 +39,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
{ .name = "ctx", .version = 1, .enabled = 1 }, { .name = "ctx", .version = 1, .enabled = 1 },
{ .name = "funcs", .version = 0, .enabled = 0 }, { .name = "funcs", .version = 0, .enabled = 0 },
{ .name = "http_api_v2", .version = 1, .enabled = 1 }, { .name = "http_api_v2", .version = 1, .enabled = 1 },
{ .name = "health", .version = 1, .enabled = host->health.health_enabled },
{ .name = NULL, .version = 0, .enabled = 0 } { .name = NULL, .version = 0, .enabled = 0 }
}; };

View file

@ -185,19 +185,19 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok) const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok)
{ {
switch (qt) { switch (qt) {
case HTTP_API_V2: return "http_api_request_v2"; case HTTP_API_V2: return "http_api_request_v2";
case REGISTER_NODE: return "register_node"; case REGISTER_NODE: return "register_node";
case NODE_STATE_UPDATE: return "node_state_update"; case NODE_STATE_UPDATE: return "node_state_update";
case CHART_DIMS_UPDATE: return "chart_and_dim_update"; case CHART_DIMS_UPDATE: return "chart_and_dim_update";
case CHART_CONFIG_UPDATED: return "chart_config_updated"; case CHART_CONFIG_UPDATED: return "chart_config_updated";
case CHART_RESET: return "reset_chart_messages"; case CHART_RESET: return "reset_chart_messages";
case RETENTION_UPDATED: return "update_retention_info"; case RETENTION_UPDATED: return "update_retention_info";
case UPDATE_NODE_INFO: return "update_node_info"; case UPDATE_NODE_INFO: return "update_node_info";
case ALARM_LOG_HEALTH: return "alarm_log_health"; case ALARM_PROVIDE_CHECKPOINT: return "alarm_checkpoint";
case ALARM_PROVIDE_CFG: return "provide_alarm_config"; case ALARM_PROVIDE_CFG: return "provide_alarm_config";
case ALARM_SNAPSHOT: return "alarm_snapshot"; case ALARM_SNAPSHOT: return "alarm_snapshot";
case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
default: default:
if (!unknown_ok) if (!unknown_ok)
error_report("Unknown query type used %d", (int) qt); error_report("Unknown query type used %d", (int) qt);

View file

@ -19,7 +19,7 @@ typedef enum {
CHART_RESET, CHART_RESET,
RETENTION_UPDATED, RETENTION_UPDATED,
UPDATE_NODE_INFO, UPDATE_NODE_INFO,
ALARM_LOG_HEALTH, ALARM_PROVIDE_CHECKPOINT,
ALARM_PROVIDE_CFG, ALARM_PROVIDE_CFG,
ALARM_SNAPSHOT, ALARM_SNAPSHOT,
UPDATE_NODE_COLLECTORS, UPDATE_NODE_COLLECTORS,

View file

@ -339,25 +339,27 @@ int update_chart_configs(const char *msg, size_t msg_len)
int start_alarm_streaming(const char *msg, size_t msg_len) int start_alarm_streaming(const char *msg, size_t msg_len)
{ {
struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len);
if (!res.node_id || !res.batch_id) { if (!res.node_id) {
error("Error parsing StartAlarmStreaming"); error("Error parsing StartAlarmStreaming");
freez(res.node_id);
return 1; return 1;
} }
aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); aclk_start_alert_streaming(res.node_id, res.resets);
freez(res.node_id); freez(res.node_id);
return 0; return 0;
} }
int send_alarm_log_health(const char *msg, size_t msg_len) int send_alarm_checkpoint(const char *msg, size_t msg_len)
{ {
char *node_id = parse_send_alarm_log_health(msg, msg_len); struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len);
if (!node_id) { if (!sac.node_id || !sac.claim_id) {
error("Error parsing SendAlarmLogHealth"); error("Error parsing SendAlarmCheckpoint");
freez(sac.node_id);
freez(sac.claim_id);
return 1; return 1;
} }
aclk_send_alarm_health_log(node_id); aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id);
freez(node_id); freez(sac.node_id);
freez(sac.claim_id);
return 0; return 0;
} }
@ -377,12 +379,12 @@ int send_alarm_configuration(const char *msg, size_t msg_len)
int send_alarm_snapshot(const char *msg, size_t msg_len) int send_alarm_snapshot(const char *msg, size_t msg_len)
{ {
struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
if (!sas->node_id || !sas->claim_id) { if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) {
error("Error parsing SendAlarmSnapshot"); error("Error parsing SendAlarmSnapshot");
destroy_send_alarm_snapshot(sas); destroy_send_alarm_snapshot(sas);
return 1; return 1;
} }
aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_uuid);
destroy_send_alarm_snapshot(sas); destroy_send_alarm_snapshot(sas);
return 0; return 0;
} }
@ -458,7 +460,7 @@ new_cloud_rx_msg_t rx_msgs[] = {
{ .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack },
{ .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs },
{ .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming },
{ .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, { .name = "SendAlarmCheckpoint", .name_hash = 0, .fnc = send_alarm_checkpoint },
{ .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration },
{ .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot },
{ .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req },

View file

@ -120,10 +120,10 @@ struct topic_name {
{ .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" },
{ .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" },
{ .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" },
{ .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" }, { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log-v2" },
{ .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, { .id = ACLK_TOPICID_ALARM_CHECKPOINT, .name = "alarm-checkpoint" },
{ .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
{ .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot-v2" },
{ .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
{ .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
{ .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
@ -146,7 +146,7 @@ enum aclk_topics compulsory_topics[] = {
ACLK_TOPICID_RETENTION_UPDATED, ACLK_TOPICID_RETENTION_UPDATED,
ACLK_TOPICID_NODE_INFO, ACLK_TOPICID_NODE_INFO,
ACLK_TOPICID_ALARM_LOG, ACLK_TOPICID_ALARM_LOG,
ACLK_TOPICID_ALARM_HEALTH, ACLK_TOPICID_ALARM_CHECKPOINT,
ACLK_TOPICID_ALARM_CONFIG, ACLK_TOPICID_ALARM_CONFIG,
ACLK_TOPICID_ALARM_SNAPSHOT, ACLK_TOPICID_ALARM_SNAPSHOT,
ACLK_TOPICID_NODE_COLLECTORS, ACLK_TOPICID_NODE_COLLECTORS,

View file

@ -85,7 +85,7 @@ enum aclk_topics {
ACLK_TOPICID_RETENTION_UPDATED = 12, ACLK_TOPICID_RETENTION_UPDATED = 12,
ACLK_TOPICID_NODE_INFO = 13, ACLK_TOPICID_NODE_INFO = 13,
ACLK_TOPICID_ALARM_LOG = 14, ACLK_TOPICID_ALARM_LOG = 14,
ACLK_TOPICID_ALARM_HEALTH = 15, ACLK_TOPICID_ALARM_CHECKPOINT = 15,
ACLK_TOPICID_ALARM_CONFIG = 16, ACLK_TOPICID_ALARM_CONFIG = 16,
ACLK_TOPICID_ALARM_SNAPSHOT = 17, ACLK_TOPICID_ALARM_SNAPSHOT = 17,
ACLK_TOPICID_NODE_COLLECTORS = 18, ACLK_TOPICID_NODE_COLLECTORS = 18,

View file

@ -21,57 +21,24 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_
return ret; return ret;
ret.node_id = strdupz(msg.node_id().c_str()); ret.node_id = strdupz(msg.node_id().c_str());
ret.batch_id = msg.batch_id(); ret.resets = msg.resets();
ret.start_seq_id = msg.start_sequnce_id();
return ret; return ret;
} }
char *parse_send_alarm_log_health(const char *data, size_t len) struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len)
{ {
SendAlarmLogHealth msg; struct send_alarm_checkpoint ret;
memset(&ret, 0, sizeof(ret));
SendAlarmCheckpoint msg;
if (!msg.ParseFromArray(data, len)) if (!msg.ParseFromArray(data, len))
return NULL; return ret;
return strdupz(msg.node_id().c_str());
}
char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) ret.node_id = strdupz(msg.node_id().c_str());
{ ret.claim_id = strdupz(msg.claim_id().c_str());
AlarmLogHealth msg;
LogEntries *entries;
msg.set_claim_id(data->claim_id); return ret;
msg.set_node_id(data->node_id);
msg.set_enabled(data->enabled);
switch (data->status) {
case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE:
msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE);
break;
case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING:
msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING);
break;
case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED:
msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED);
break;
default:
error("Unknown status of AlarmLogHealth LogEntry");
return NULL;
}
entries = msg.mutable_log_entries();
entries->set_first_sequence_id(data->log_entries.first_seq_id);
entries->set_last_sequence_id(data->log_entries.last_seq_id);
set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when());
set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when());
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)mallocz(*len);
if (!msg.SerializeToArray(bin, *len))
return NULL;
return bin;
} }
static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status)
@ -131,8 +98,6 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr
if (data->family) if (data->family)
proto->set_family(data->family); proto->set_family(data->family);
proto->set_batch_id(data->batch_id);
proto->set_sequence_id(data->sequence_id);
proto->set_when(data->when); proto->set_when(data->when);
proto->set_config_hash(data->config_hash); proto->set_config_hash(data->config_hash);
@ -187,6 +152,24 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
return bin; return bin;
} }
char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data)
{
AlarmCheckpoint msg;
msg.set_claim_id(data->claim_id);
msg.set_node_id(data->node_id);
msg.set_checksum(data->checksum);
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)mallocz(*len);
if (!msg.SerializeToArray(bin, *len)) {
freez(bin);
return NULL;
}
return bin;
}
struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len)
{ {
SendAlarmSnapshot msg; SendAlarmSnapshot msg;
@ -198,8 +181,8 @@ struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t l
ret->claim_id = strdupz(msg.claim_id().c_str()); ret->claim_id = strdupz(msg.claim_id().c_str());
if (msg.node_id().c_str()) if (msg.node_id().c_str())
ret->node_id = strdupz(msg.node_id().c_str()); ret->node_id = strdupz(msg.node_id().c_str());
ret->snapshot_id = msg.snapshot_id(); if (msg.snapshot_uuid().c_str())
ret->sequence_id = msg.sequence_id(); ret->snapshot_uuid = strdupz(msg.snapshot_uuid().c_str());
return ret; return ret;
} }
@ -208,6 +191,7 @@ void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr)
{ {
freez(ptr->claim_id); freez(ptr->claim_id);
freez(ptr->node_id); freez(ptr->node_id);
freez(ptr->snapshot_uuid);
freez(ptr); freez(ptr);
} }
@ -218,7 +202,7 @@ alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *
msg->set_node_id(data->node_id); msg->set_node_id(data->node_id);
msg->set_claim_id(data->claim_id); msg->set_claim_id(data->claim_id);
msg->set_snapshot_id(data->snapshot_id); msg->set_snapshot_uuid(data->snapshot_uuid);
msg->set_chunks(data->chunks); msg->set_chunks(data->chunks);
msg->set_chunk(data->chunk); msg->set_chunk(data->chunk);

View file

@ -11,38 +11,12 @@
extern "C" { extern "C" {
#endif #endif
enum alarm_log_status_aclk {
ALARM_LOG_STATUS_UNSPECIFIED = 0,
ALARM_LOG_STATUS_RUNNING = 1,
ALARM_LOG_STATUS_IDLE = 2
};
struct alarm_log_entries {
int64_t first_seq_id;
struct timeval first_when;
int64_t last_seq_id;
struct timeval last_when;
};
struct alarm_log_health {
char *claim_id;
char *node_id;
int enabled;
enum alarm_log_status_aclk status;
struct alarm_log_entries log_entries;
};
struct start_alarm_streaming { struct start_alarm_streaming {
char *node_id; char *node_id;
uint64_t batch_id; bool resets;
uint64_t start_seq_id;
}; };
struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len);
char *parse_send_alarm_log_health(const char *data, size_t len);
char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data);
enum aclk_alarm_status { enum aclk_alarm_status {
ALARM_STATUS_NULL = 0, ALARM_STATUS_NULL = 0,
@ -101,17 +75,27 @@ struct alarm_log_entry {
char *chart_context; char *chart_context;
}; };
struct send_alarm_checkpoint {
char *node_id;
char *claim_id;
};
struct alarm_checkpoint {
char *node_id;
char *claim_id;
char *checksum;
};
struct send_alarm_snapshot { struct send_alarm_snapshot {
char *node_id; char *node_id;
char *claim_id; char *claim_id;
uint64_t snapshot_id; char *snapshot_uuid;
uint64_t sequence_id;
}; };
struct alarm_snapshot { struct alarm_snapshot {
char *node_id; char *node_id;
char *claim_id; char *claim_id;
uint64_t snapshot_id; char *snapshot_uuid;
uint32_t chunks; uint32_t chunks;
uint32_t chunk; uint32_t chunk;
}; };
@ -125,6 +109,9 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data);
struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len);
void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr);
struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len);
char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data);
alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data);
void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data);
char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot);

View file

@ -29,8 +29,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new nodeinstance::create::v1::CreateNodeInstance; return new nodeinstance::create::v1::CreateNodeInstance;
if (!strcmp(msgname, "UpdateNodeInfo")) if (!strcmp(msgname, "UpdateNodeInfo"))
return new nodeinstance::info::v1::UpdateNodeInfo; return new nodeinstance::info::v1::UpdateNodeInfo;
if (!strcmp(msgname, "AlarmLogHealth")) if (!strcmp(msgname, "AlarmCheckpoint"))
return new alarms::v1::AlarmLogHealth; return new alarms::v1::AlarmCheckpoint;
if (!strcmp(msgname, "ProvideAlarmConfiguration")) if (!strcmp(msgname, "ProvideAlarmConfiguration"))
return new alarms::v1::ProvideAlarmConfiguration; return new alarms::v1::ProvideAlarmConfiguration;
if (!strcmp(msgname, "AlarmSnapshot")) if (!strcmp(msgname, "AlarmSnapshot"))
@ -51,8 +51,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
return new agent::v1::SendNodeInstances; return new agent::v1::SendNodeInstances;
if (!strcmp(msgname, "StartAlarmStreaming")) if (!strcmp(msgname, "StartAlarmStreaming"))
return new alarms::v1::StartAlarmStreaming; return new alarms::v1::StartAlarmStreaming;
if (!strcmp(msgname, "SendAlarmLogHealth")) if (!strcmp(msgname, "SendAlarmCheckpoint"))
return new alarms::v1::SendAlarmLogHealth; return new alarms::v1::SendAlarmCheckpoint;
if (!strcmp(msgname, "SendAlarmConfiguration")) if (!strcmp(msgname, "SendAlarmConfiguration"))
return new alarms::v1::SendAlarmConfiguration; return new alarms::v1::SendAlarmConfiguration;
if (!strcmp(msgname, "SendAlarmSnapshot")) if (!strcmp(msgname, "SendAlarmSnapshot"))

View file

@ -367,12 +367,12 @@ static void aclk_synchronization(void *arg __maybe_unused)
service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint");
worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
@ -437,9 +437,6 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_PUSH_ALERT: case ACLK_DATABASE_PUSH_ALERT:
aclk_push_alert_events_for_all_hosts(); aclk_push_alert_events_for_all_hosts();
break; break;
case ACLK_DATABASE_ALARM_HEALTH_LOG:
aclk_push_alarm_health_log(cmd.param[0]);
break;
case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:;
aclk_push_alert_snapshot_event(cmd.param[0]); aclk_push_alert_snapshot_event(cmd.param[0]);
break; break;
@ -605,14 +602,6 @@ void aclk_push_node_alert_snapshot(const char *node_id)
} }
void aclk_push_node_health_log(const char *node_id)
{
if (unlikely(!aclk_sync_config.initialized))
return;
queue_aclk_sync_cmd(ACLK_DATABASE_ALARM_HEALTH_LOG, strdupz(node_id), NULL);
}
void aclk_push_node_removed_alerts(const char *node_id) void aclk_push_node_removed_alerts(const char *node_id)
{ {
if (unlikely(!aclk_sync_config.initialized)) if (unlikely(!aclk_sync_config.initialized))

View file

@ -41,13 +41,13 @@ static inline int claimed()
enum aclk_database_opcode { enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0, ACLK_DATABASE_NOOP = 0,
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP, ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST, ACLK_DATABASE_DELETE_HOST,
ACLK_DATABASE_NODE_STATE, ACLK_DATABASE_NODE_STATE,
ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT,
ACLK_DATABASE_PUSH_ALERT_CONFIG, ACLK_DATABASE_PUSH_ALERT_CONFIG,
ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,
ACLK_DATABASE_QUEUE_REMOVED_ALERTS, ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
ACLK_DATABASE_TIMER, ACLK_DATABASE_TIMER,
@ -72,14 +72,13 @@ struct aclk_database_cmdqueue {
struct aclk_sync_host_config { struct aclk_sync_host_config {
RRDHOST *host; RRDHOST *host;
int alert_updates; int alert_updates;
int alert_checkpoint_req;
int alert_queue_removed;
time_t node_info_send_time; time_t node_info_send_time;
time_t node_collectors_send; time_t node_collectors_send;
char uuid_str[UUID_STR_LEN]; char uuid_str[UUID_STR_LEN];
char node_id[UUID_STR_LEN]; char node_id[UUID_STR_LEN];
uint64_t alerts_batch_id; // batch id for alerts to use char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
uint64_t alerts_snapshot_id; // will contain the snapshot_id value if snapshot was requested
uint64_t alerts_ack_sequence_id; // last sequence_id ack'ed from cloud via sendsnapshot message
}; };
extern sqlite3 *db_meta; extern sqlite3 *db_meta;

View file

@ -97,9 +97,6 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
if (unlikely(uuid_is_null(ae->config_hash_id))) if (unlikely(uuid_is_null(ae->config_hash_id)))
return 0; return 0;
if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
return 0;
char sql[ACLK_SYNC_QUERY_SIZE]; char sql[ACLK_SYNC_QUERY_SIZE];
uuid_t config_hash_id; uuid_t config_hash_id;
RRDCALC_STATUS status; RRDCALC_STATUS status;
@ -193,10 +190,13 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
} }
} }
sqlite3_stmt *res_alert = NULL; char uuid_str[UUID_STR_LEN];
char uuid_str[GUID_LEN + 1];
uuid_unparse_lower_fix(&host->host_uuid, uuid_str); uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
return 0;
sqlite3_stmt *res_alert = NULL;
char sql[ACLK_SYNC_QUERY_SIZE]; char sql[ACLK_SYNC_QUERY_SIZE];
snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
@ -278,26 +278,6 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
if (wc->alerts_start_seq_id != 0) {
buffer_sprintf(
sql,
"UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
"; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64
" and date_cloud_ack is null "
"; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64
" and date_submitted is null",
wc->uuid_str,
wc->alerts_start_seq_id,
wc->uuid_str,
wc->alerts_start_seq_id,
wc->uuid_str,
wc->alerts_start_seq_id);
if (unlikely(db_execute(buffer_tostring(sql))))
error_report("Failed to reset ACLK alert entries");
buffer_reset(sql);
wc->alerts_start_seq_id = 0;
}
int limit = ACLK_MAX_ALERT_UPDATES; int limit = ACLK_MAX_ALERT_UPDATES;
sqlite3_stmt *res = NULL; sqlite3_stmt *res = NULL;
@ -359,8 +339,8 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11)); alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL; alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
alarm_log.batch_id = wc->alerts_batch_id; //alarm_log.batch_id = wc->alerts_batch_id;
alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
alarm_log.when = (time_t) sqlite3_column_int64(res, 5); alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str); uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
@ -445,12 +425,11 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
} else { } else {
if (log_first_sequence_id) if (log_first_sequence_id)
log_access( log_access(
"ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
wc->node_id, wc->node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->host ? rrdhost_hostname(wc->host) : "N/A",
log_first_sequence_id, log_first_sequence_id,
log_last_sequence_id, log_last_sequence_id);
wc->alerts_batch_id);
log_first_sequence_id = 0; log_first_sequence_id = 0;
log_last_sequence_id = 0; log_last_sequence_id = 0;
} }
@ -494,121 +473,17 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
"where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
"order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
if (unlikely(db_execute(buffer_tostring(sql)))) if (unlikely(db_execute(buffer_tostring(sql))))
error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host)); error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host));
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
buffer_free(sql); buffer_free(sql);
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
} }
void aclk_send_alarm_health_log(char *node_id)
{
if (unlikely(!node_id))
return;
struct aclk_sync_host_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host))
return;
wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
if (unlikely(!wc)) {
log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id);
return;
}
log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, rrdhost_hostname(host));
aclk_push_node_health_log(node_id);
}
void aclk_push_alarm_health_log(char *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
int rc;
char *claim_id = get_agent_claimid();
if (unlikely(!claim_id)) {
freez(node_id);
return;
}
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host)) {
log_access("AC [%s (N/A)]: Node id not found", node_id);
freez(claim_id);
freez(node_id);
return;
}
struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
int64_t first_sequence = 0;
int64_t last_sequence = 0;
struct timeval first_timestamp;
struct timeval last_timestamp;
char sql[ACLK_SYNC_QUERY_SIZE];
sqlite3_stmt *res = NULL;
//TODO: make this better: include info from health log too
snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, "SELECT MIN(sequence_id), MIN(date_created), " \
"MAX(sequence_id), MAX(date_created) FROM aclk_alert_%s;", wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement to get health log statistics from the database");
freez(claim_id);
return;
}
first_timestamp.tv_sec = 0;
first_timestamp.tv_usec = 0;
last_timestamp.tv_sec = 0;
last_timestamp.tv_usec = 0;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (int64_t) sqlite3_column_int64(res, 0) : 0;
if (sqlite3_column_bytes(res, 1) > 0) {
first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
}
last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (int64_t) sqlite3_column_int64(res, 2) : 0;
if (sqlite3_column_bytes(res, 3) > 0) {
last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
}
}
struct alarm_log_entries log_entries;
log_entries.first_seq_id = first_sequence;
log_entries.first_when = first_timestamp;
log_entries.last_seq_id = last_sequence;
log_entries.last_when = last_timestamp;
struct alarm_log_health alarm_log;
alarm_log.claim_id = claim_id;
alarm_log.node_id = wc->node_id;
alarm_log.log_entries = log_entries;
alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
alarm_log.enabled = (int)host->health.health_enabled;
aclk_send_alarm_log_health(&alarm_log);
log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %ld to %ld", wc->node_id, rrdhost_hostname(host), first_sequence, last_sequence);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
freez(claim_id);
freez(node_id);
aclk_alert_reloaded = 1;
#endif
}
void aclk_send_alarm_configuration(char *config_hash) void aclk_send_alarm_configuration(char *config_hash)
{ {
if (unlikely(!config_hash)) if (unlikely(!config_hash))
@ -755,7 +630,7 @@ bind_fail:
// Start streaming alerts // Start streaming alerts
void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id) void aclk_start_alert_streaming(char *node_id, bool resets)
{ {
if (unlikely(!node_id)) if (unlikely(!node_id))
return; return;
@ -779,20 +654,22 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
return; return;
} }
// TODO: CHECK if (resets) {
if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
sql_queue_existing_alerts_to_aclk(host); sql_queue_existing_alerts_to_aclk(host);
} else
log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id);
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
wc->alert_updates = 1; wc->alert_updates = 1;
wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
} }
#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ #define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
"SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \ "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \
"WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \ "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \
"(SELECT alert_unique_id FROM aclk_alert_%s) ORDER BY unique_id ASC " \ "(SELECT alert_unique_id FROM aclk_alert_%s) " \
"AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
"ORDER BY unique_id ASC " \
"ON CONFLICT (alert_unique_id) DO NOTHING;" "ON CONFLICT (alert_unique_id) DO NOTHING;"
void sql_process_queue_removed_alerts_to_aclk(char *node_id) void sql_process_queue_removed_alerts_to_aclk(char *node_id)
@ -814,7 +691,9 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id)
} }
else else
log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
wc->alert_queue_removed = 0;
} }
void sql_queue_removed_alerts_to_aclk(RRDHOST *host) void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
@ -831,7 +710,7 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
aclk_push_node_removed_alerts(node_id); aclk_push_node_removed_alerts(node_id);
} }
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, uint64_t snapshot_id, uint64_t sequence_id) void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
{ {
uuid_t node_uuid; uuid_t node_uuid;
if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
@ -851,19 +730,17 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unus
} }
log_access( log_access(
"IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
wc->node_id, node_id,
wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->host ? rrdhost_hostname(wc->host) : "N/A",
snapshot_id, snapshot_uuid);
sequence_id); if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
if (wc->alerts_snapshot_id == snapshot_id) return;
return; __sync_synchronize();
__sync_synchronize(); wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
wc->alerts_snapshot_id = snapshot_id; __sync_synchronize();
wc->alerts_ack_sequence_id = sequence_id;
__sync_synchronize();
aclk_push_node_alert_snapshot(node_id); aclk_push_node_alert_snapshot(node_id);
} }
#ifdef ENABLE_ACLK #ifdef ENABLE_ACLK
@ -934,8 +811,6 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
#endif #endif
#define ALARM_EVENTS_PER_CHUNK 10 #define ALARM_EVENTS_PER_CHUNK 10
#define SQL_ALERT_CLOUD_ACK "UPDATE aclk_alert_%s SET date_cloud_ack=unixepoch() WHERE sequence_id <= %"PRIu64
void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
{ {
#ifdef ENABLE_ACLK #ifdef ENABLE_ACLK
@ -959,21 +834,14 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
return; return;
} }
if (unlikely(!wc->alerts_snapshot_id)) if (unlikely(!wc->alerts_snapshot_uuid))
return; return;
char *claim_id = get_agent_claimid(); char *claim_id = get_agent_claimid();
if (unlikely(!claim_id)) if (unlikely(!claim_id))
return; return;
log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_id); log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
if (wc->alerts_ack_sequence_id) {
char sql[512];
snprintfz(sql, 511, SQL_ALERT_CLOUD_ACK, wc->uuid_str, wc->alerts_ack_sequence_id);
if (unlikely(db_execute(sql)))
error_report("Failed to set ACLK alert entries cloud ACK status for host %s", rrdhost_hostname(host));
}
uint32_t cnt = 0; uint32_t cnt = 0;
char uuid_str[UUID_STR_LEN]; char uuid_str[UUID_STR_LEN];
@ -1009,7 +877,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
struct alarm_snapshot alarm_snap; struct alarm_snapshot alarm_snap;
alarm_snap.node_id = wc->node_id; alarm_snap.node_id = wc->node_id;
alarm_snap.claim_id = claim_id; alarm_snap.claim_id = claim_id;
alarm_snap.snapshot_id = wc->alerts_snapshot_id; alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
alarm_snap.chunks = chunks; alarm_snap.chunks = chunks;
alarm_snap.chunk = chunk; alarm_snap.chunk = chunk;
@ -1051,7 +919,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
struct alarm_snapshot alarm_snap; struct alarm_snapshot alarm_snap;
alarm_snap.node_id = wc->node_id; alarm_snap.node_id = wc->node_id;
alarm_snap.claim_id = claim_id; alarm_snap.claim_id = claim_id;
alarm_snap.snapshot_id = wc->alerts_snapshot_id; alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
alarm_snap.chunks = chunks; alarm_snap.chunks = chunks;
alarm_snap.chunk = chunk; alarm_snap.chunk = chunk;
@ -1065,7 +933,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
} }
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
wc->alerts_snapshot_id = 0; wc->alerts_snapshot_uuid = NULL;
freez(claim_id); freez(claim_id);
#endif #endif
@ -1093,7 +961,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
} }
#define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
"(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_cloud_ack IS NOT NULL), " \
"(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
"FROM aclk_alert_%s WHERE date_submitted IS NULL;" "FROM aclk_alert_%s WHERE date_submitted IS NULL;"
@ -1106,12 +973,11 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert
return 1; return 1;
proto_alert_status->alert_updates = wc->alert_updates; proto_alert_status->alert_updates = wc->alert_updates;
proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
char sql[ACLK_SYNC_QUERY_SIZE]; char sql[ACLK_SYNC_QUERY_SIZE];
sqlite3_stmt *res = NULL; sqlite3_stmt *res = NULL;
snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str, wc->uuid_str); snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) { if (rc != SQLITE_OK) {
@ -1122,8 +988,7 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert
while (sqlite3_step_monitored(res) == SQLITE_ROW) { while (sqlite3_step_monitored(res) == SQLITE_ROW) {
proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
} }
rc = sqlite3_finalize(res); rc = sqlite3_finalize(res);
@ -1132,3 +997,140 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert
return 0; return 0;
} }
void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
{
if (unlikely(!node_id))
return;
struct aclk_sync_host_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
if (unlikely(!host))
return;
wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
if (unlikely(!wc)) {
log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
return;
}
log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
}
typedef struct active_alerts {
char *name;
char *chart;
RRDCALC_STATUS status;
} active_alerts_t;
static inline int compare_active_alerts(const void * a, const void * b) {
active_alerts_t *active_alerts_a = (active_alerts_t *)a;
active_alerts_t *active_alerts_b = (active_alerts_t *)b;
if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) )
{
return strcmp(active_alerts_a->chart, active_alerts_b->chart);
}
else
return strcmp(active_alerts_a->name, active_alerts_b->name);
}
void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
{
#ifdef ENABLE_ACLK
struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
if (unlikely(!wc)) {
log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
return;
}
//TODO: make sure all pending events are sent.
if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
//postpone checkpoint send
wc->alert_checkpoint_req++;
log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
return;
}
//TODO: lock rc here, or make sure it's called when health decides
//count them
RRDCALC *rc;
uint32_t cnt = 0;
size_t len = 0;
active_alerts_t *active_alerts = NULL;
foreach_rrdcalc_in_rrdhost_read(host, rc) {
if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
continue;
if (rc->status == RRDCALC_STATUS_WARNING ||
rc->status == RRDCALC_STATUS_CRITICAL) {
cnt++;
}
}
foreach_rrdcalc_in_rrdhost_done(rc);
if (cnt) {
active_alerts = callocz(cnt, sizeof(active_alerts_t));
cnt = 0;
foreach_rrdcalc_in_rrdhost_read(host, rc) {
if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
continue;
if (rc->status == RRDCALC_STATUS_WARNING ||
rc->status == RRDCALC_STATUS_CRITICAL) {
active_alerts[cnt].name = (char *)rrdcalc_name(rc);
len += string_strlen(rc->name);
active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
len += string_strlen(rc->chart);
active_alerts[cnt].status = rc->status;
len++;
cnt++;
}
}
foreach_rrdcalc_in_rrdhost_done(rc);
}
BUFFER *alarms_to_hash;
if (cnt) {
qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
alarms_to_hash = buffer_create(len, NULL);
for (uint32_t i=0;i<cnt;i++) {
buffer_strcat(alarms_to_hash, active_alerts[i].name);
buffer_strcat(alarms_to_hash, active_alerts[i].chart);
if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
buffer_strcat(alarms_to_hash, "W");
else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
buffer_strcat(alarms_to_hash, "C");
}
} else {
alarms_to_hash = buffer_create(1, NULL);
buffer_strcat(alarms_to_hash, "");
len = 0;
}
char hash[SHA256_DIGEST_LENGTH + 1];
if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
hash[SHA256_DIGEST_LENGTH] = 0;
struct alarm_checkpoint alarm_checkpoint;
char *claim_id = get_agent_claimid();
alarm_checkpoint.claim_id = claim_id;
alarm_checkpoint.node_id = wc->node_id;
alarm_checkpoint.checksum = (char *)hash;
aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
} else {
log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
}
wc->alert_checkpoint_req = 0;
buffer_free(alarms_to_hash);
#endif
}

View file

@ -5,10 +5,11 @@
extern sqlite3 *db_meta; extern sqlite3 *db_meta;
#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3
#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4
struct proto_alert_status { struct proto_alert_status {
int alert_updates; int alert_updates;
uint64_t alerts_batch_id;
uint64_t last_acked_sequence_id;
uint64_t pending_min_sequence_id; uint64_t pending_min_sequence_id;
uint64_t pending_max_sequence_id; uint64_t pending_max_sequence_id;
uint64_t last_submitted_sequence_id; uint64_t last_submitted_sequence_id;
@ -16,16 +17,16 @@ struct proto_alert_status {
int aclk_add_alert_event(struct aclk_sync_host_config *wc, struct aclk_database_cmd cmd); int aclk_add_alert_event(struct aclk_sync_host_config *wc, struct aclk_database_cmd cmd);
void aclk_push_alert_event(struct aclk_sync_host_config *wc); void aclk_push_alert_event(struct aclk_sync_host_config *wc);
void aclk_send_alarm_health_log(char *node_id);
void aclk_push_alarm_health_log(char *node_id);
void aclk_send_alarm_configuration (char *config_hash); void aclk_send_alarm_configuration (char *config_hash);
int aclk_push_alert_config_event(char *node_id, char *config_hash); int aclk_push_alert_config_event(char *node_id, char *config_hash);
void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id); void aclk_start_alert_streaming(char *node_id, bool resets);
void sql_queue_removed_alerts_to_aclk(RRDHOST *host); void sql_queue_removed_alerts_to_aclk(RRDHOST *host);
void sql_process_queue_removed_alerts_to_aclk(char *node_id); void sql_process_queue_removed_alerts_to_aclk(char *node_id);
void aclk_send_alarm_checkpoint(char *node_id, char *claim_id);
void aclk_push_alarm_checkpoint(RRDHOST *host);
void aclk_push_alert_snapshot_event(char *node_id); void aclk_push_alert_snapshot_event(char *node_id);
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid);
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter);
void aclk_push_alert_events_for_all_hosts(void); void aclk_push_alert_events_for_all_hosts(void);

View file

@ -347,6 +347,15 @@ static void health_reload_host(RRDHOST *host) {
rrdcalctemplate_link_matching_templates_to_rrdset(st); rrdcalctemplate_link_matching_templates_to_rrdset(st);
} }
rrdset_foreach_done(st); rrdset_foreach_done(st);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
if (likely(wc)) {
wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
}
}
#endif
} }
/** /**
@ -362,12 +371,6 @@ void health_reload(void) {
health_reload_host(host); health_reload_host(host);
} }
dfe_done(host); dfe_done(host);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
aclk_alert_reloaded = 1;
}
#endif
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -977,9 +980,7 @@ void *health_main(void *ptr) {
rrdcalc_delete_alerts_not_matching_host_labels_from_all_hosts(); rrdcalc_delete_alerts_not_matching_host_labels_from_all_hosts();
unsigned int loop = 0; unsigned int loop = 0;
#ifdef ENABLE_ACLK
unsigned int marked_aclk_reload_loop = 0;
#endif
while(service_running(SERVICE_HEALTH)) { while(service_running(SERVICE_HEALTH)) {
loop++; loop++;
debug(D_HEALTH, "Health monitoring iteration no %u started", loop); debug(D_HEALTH, "Health monitoring iteration no %u started", loop);
@ -1008,11 +1009,6 @@ void *health_main(void *ptr) {
} }
} }
#ifdef ENABLE_ACLK
if (aclk_alert_reloaded && !marked_aclk_reload_loop)
marked_aclk_reload_loop = loop;
#endif
worker_is_busy(WORKER_HEALTH_JOB_RRD_LOCK); worker_is_busy(WORKER_HEALTH_JOB_RRD_LOCK);
dfe_start_reentrant(rrdhost_root_index, host) { dfe_start_reentrant(rrdhost_root_index, host) {
@ -1117,7 +1113,7 @@ void *health_main(void *ptr) {
rc->value = NAN; rc->value = NAN;
#ifdef ENABLE_ACLK #ifdef ENABLE_ACLK
if (netdata_cloud_setting && likely(!aclk_alert_reloaded)) if (netdata_cloud_setting)
sql_queue_alarm_to_aclk(host, ae, 1); sql_queue_alarm_to_aclk(host, ae, 1);
#endif #endif
} }
@ -1488,6 +1484,26 @@ void *health_main(void *ptr) {
} }
break; break;
} }
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
if (unlikely(!wc)) {
continue;
}
if (wc->alert_queue_removed == 1) {
sql_queue_removed_alerts_to_aclk(host);
} else if (wc->alert_queue_removed > 1) {
wc->alert_queue_removed--;
}
if (wc->alert_checkpoint_req == 1) {
aclk_push_alarm_checkpoint(host);
} else if (wc->alert_checkpoint_req > 1) {
wc->alert_checkpoint_req--;
}
}
#endif
} }
dfe_done(host); dfe_done(host);
@ -1500,23 +1516,6 @@ void *health_main(void *ptr) {
health_alarm_wait_for_execution(ae); health_alarm_wait_for_execution(ae);
} }
#ifdef ENABLE_ACLK
if (netdata_cloud_setting && unlikely(aclk_alert_reloaded) && loop > (marked_aclk_reload_loop + 2)) {
dfe_start_reentrant(rrdhost_root_index, host) {
if(unlikely(!service_running(SERVICE_HEALTH)))
break;
if (unlikely(!host->health.health_enabled))
continue;
sql_queue_removed_alerts_to_aclk(host);
}
dfe_done(host);
aclk_alert_reloaded = 0;
marked_aclk_reload_loop = 0;
}
#endif
if(unlikely(!service_running(SERVICE_HEALTH))) if(unlikely(!service_running(SERVICE_HEALTH)))
break; break;

View file

@ -2009,3 +2009,28 @@ void timing_action(TIMING_ACTION action, TIMING_STEP step) {
} }
} }
} }
int hash256_string(const unsigned char *string, size_t size, char *hash) {
EVP_MD_CTX *ctx;
ctx = EVP_MD_CTX_create();
if (!ctx)
return 0;
if (!EVP_DigestInit(ctx, EVP_sha256())) {
EVP_MD_CTX_destroy(ctx);
return 0;
}
if (!EVP_DigestUpdate(ctx, string, size)) {
EVP_MD_CTX_destroy(ctx);
return 0;
}
if (!EVP_DigestFinal(ctx, (unsigned char *)hash, NULL)) {
EVP_MD_CTX_destroy(ctx);
return 0;
}
return 1;
}

View file

@ -782,6 +782,7 @@ typedef enum {
#endif #endif
void timing_action(TIMING_ACTION action, TIMING_STEP step); void timing_action(TIMING_ACTION action, TIMING_STEP step);
int hash256_string(const unsigned char *string, size_t size, char *hash);
# ifdef __cplusplus # ifdef __cplusplus
} }
# endif # endif