diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas index 3252118bd5..d3a5c636b6 160000 --- a/aclk/aclk-schemas +++ b/aclk/aclk-schemas @@ -1 +1 @@ -Subproject commit 3252118bd547640251356629f0df05eaf952ac39 +Subproject commit d3a5c636b6dacf364834f2ba99ce0170c71ef861 diff --git a/aclk/aclk.c b/aclk/aclk.c index 1ac3c9a2f9..399bc9876b 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -49,8 +49,6 @@ float last_backoff_value = 0; time_t aclk_block_until = 0; -int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload - #ifdef ENABLE_ACLK mqtt_wss_client mqttwss_client; @@ -928,14 +926,10 @@ static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host) } buffer_sprintf(wb, "\n\t\tUpdates: %d" - "\n\t\tBatch ID: %"PRIu64 - "\n\t\tLast Acked Seq ID: %"PRIu64 "\n\t\tPending Min Seq ID: %"PRIu64 "\n\t\tPending Max Seq ID: %"PRIu64 "\n\t\tLast Submitted Seq ID: %"PRIu64, status.alert_updates, - status.alerts_batch_id, - status.last_acked_sequence_id, status.pending_min_sequence_id, status.pending_max_sequence_id, status.last_submitted_sequence_id @@ -1043,12 +1037,6 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) json_object *tmp = json_object_new_int(status.alert_updates); json_object_object_add(obj, "updates", tmp); - tmp = json_object_new_int(status.alerts_batch_id); - json_object_object_add(obj, "batch-id", tmp); - - tmp = json_object_new_int(status.last_acked_sequence_id); - json_object_object_add(obj, "last-acked-seq-id", tmp); - tmp = json_object_new_int(status.pending_min_sequence_id); json_object_object_add(obj, "pending-min-seq-id", tmp); diff --git a/aclk/aclk.h b/aclk/aclk.h index a56aa7b263..bd8375fb53 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -26,8 +26,6 @@ extern time_t aclk_block_until; extern int disconnect_req; -extern int aclk_alert_reloaded; - #ifdef ENABLE_ACLK void *aclk_main(void *ptr); diff --git a/aclk/aclk_alarm_api.c b/aclk/aclk_alarm_api.c index 7df51a7b54..664671f705 100644 --- a/aclk/aclk_alarm_api.c +++ b/aclk/aclk_alarm_api.c @@ -8,12 +8,12 @@ #include "aclk.h" -void aclk_send_alarm_log_health(struct alarm_log_health *log_health) +void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint) { - aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH); - query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health); - query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH; - query->data.bin_payload.msg_name = "AlarmLogHealth"; + aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CHECKPOINT); + query->data.bin_payload.payload = generate_alarm_checkpoint(&query->data.bin_payload.size, checkpoint); + query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CHECKPOINT; + query->data.bin_payload.msg_name = "AlarmCheckpoint"; QUEUE_IF_PAYLOAD_PRESENT(query); } diff --git a/aclk/aclk_alarm_api.h b/aclk/aclk_alarm_api.h index e3fa92b5b8..4d9d9447a9 100644 --- a/aclk/aclk_alarm_api.h +++ b/aclk/aclk_alarm_api.h @@ -6,7 +6,7 @@ #include "../daemon/common.h" #include "schema-wrappers/schema_wrappers.h" -void aclk_send_alarm_log_health(struct alarm_log_health *log_health); +void aclk_send_provide_alarm_checkpoint(struct alarm_checkpoint *checkpoint); void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry); void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg); void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot); diff --git a/aclk/aclk_capas.c b/aclk/aclk_capas.c index e25088e231..63fe913373 100644 --- a/aclk/aclk_capas.c +++ b/aclk/aclk_capas.c @@ -14,6 +14,7 @@ const struct capability *aclk_get_agent_capas() { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 1, .enabled = 1 }, { .name = "http_api_v2", .version = 1, .enabled = 1 }, + { .name = "health", .version = 1, .enabled = 0 }, { .name = NULL, .version = 0, .enabled = 0 } }; agent_capabilities[2].version = ml_capable() ? 1 : 0; @@ -22,6 +23,8 @@ const struct capability *aclk_get_agent_capas() agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0; agent_capabilities[3].enabled = enable_metric_correlations; + agent_capabilities[7].enabled = localhost->health.health_enabled; + return agent_capabilities; } @@ -36,6 +39,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host) { .name = "ctx", .version = 1, .enabled = 1 }, { .name = "funcs", .version = 0, .enabled = 0 }, { .name = "http_api_v2", .version = 1, .enabled = 1 }, + { .name = "health", .version = 1, .enabled = host->health.health_enabled }, { .name = NULL, .version = 0, .enabled = 0 } }; diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 3908b8bad4..fd6f8555bc 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -185,19 +185,19 @@ static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok) { switch (qt) { - case HTTP_API_V2: return "http_api_request_v2"; - case REGISTER_NODE: return "register_node"; - case NODE_STATE_UPDATE: return "node_state_update"; - case CHART_DIMS_UPDATE: return "chart_and_dim_update"; - case CHART_CONFIG_UPDATED: return "chart_config_updated"; - case CHART_RESET: return "reset_chart_messages"; - case RETENTION_UPDATED: return "update_retention_info"; - case UPDATE_NODE_INFO: return "update_node_info"; - case ALARM_LOG_HEALTH: return "alarm_log_health"; - case ALARM_PROVIDE_CFG: return "provide_alarm_config"; - case ALARM_SNAPSHOT: return "alarm_snapshot"; - case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; - case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_PROVIDE_CHECKPOINT: return "alarm_checkpoint"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; default: if (!unknown_ok) error_report("Unknown query type used %d", (int) qt); diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index ab94b63848..944fc0797a 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -19,7 +19,7 @@ typedef enum { CHART_RESET, RETENTION_UPDATED, UPDATE_NODE_INFO, - ALARM_LOG_HEALTH, + ALARM_PROVIDE_CHECKPOINT, ALARM_PROVIDE_CFG, ALARM_SNAPSHOT, UPDATE_NODE_COLLECTORS, diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 2d898079b2..b4dda5c425 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -339,25 +339,27 @@ int update_chart_configs(const char *msg, size_t msg_len) int start_alarm_streaming(const char *msg, size_t msg_len) { struct start_alarm_streaming res = parse_start_alarm_streaming(msg, msg_len); - if (!res.node_id || !res.batch_id) { + if (!res.node_id) { error("Error parsing StartAlarmStreaming"); - freez(res.node_id); return 1; } - aclk_start_alert_streaming(res.node_id, res.batch_id, res.start_seq_id); + aclk_start_alert_streaming(res.node_id, res.resets); freez(res.node_id); return 0; } -int send_alarm_log_health(const char *msg, size_t msg_len) +int send_alarm_checkpoint(const char *msg, size_t msg_len) { - char *node_id = parse_send_alarm_log_health(msg, msg_len); - if (!node_id) { - error("Error parsing SendAlarmLogHealth"); + struct send_alarm_checkpoint sac = parse_send_alarm_checkpoint(msg, msg_len); + if (!sac.node_id || !sac.claim_id) { + error("Error parsing SendAlarmCheckpoint"); + freez(sac.node_id); + freez(sac.claim_id); return 1; } - aclk_send_alarm_health_log(node_id); - freez(node_id); + aclk_send_alarm_checkpoint(sac.node_id, sac.claim_id); + freez(sac.node_id); + freez(sac.claim_id); return 0; } @@ -377,12 +379,12 @@ int send_alarm_configuration(const char *msg, size_t msg_len) int send_alarm_snapshot(const char *msg, size_t msg_len) { struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len); - if (!sas->node_id || !sas->claim_id) { + if (!sas->node_id || !sas->claim_id || !sas->snapshot_uuid) { error("Error parsing SendAlarmSnapshot"); destroy_send_alarm_snapshot(sas); return 1; } - aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id); + aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_uuid); destroy_send_alarm_snapshot(sas); return 0; } @@ -458,7 +460,7 @@ new_cloud_rx_msg_t rx_msgs[] = { { .name = "ChartsAndDimensionsAck", .name_hash = 0, .fnc = charts_and_dimensions_ack }, { .name = "UpdateChartConfigs", .name_hash = 0, .fnc = update_chart_configs }, { .name = "StartAlarmStreaming", .name_hash = 0, .fnc = start_alarm_streaming }, - { .name = "SendAlarmLogHealth", .name_hash = 0, .fnc = send_alarm_log_health }, + { .name = "SendAlarmCheckpoint", .name_hash = 0, .fnc = send_alarm_checkpoint }, { .name = "SendAlarmConfiguration", .name_hash = 0, .fnc = send_alarm_configuration }, { .name = "SendAlarmSnapshot", .name_hash = 0, .fnc = send_alarm_snapshot }, { .name = "DisconnectReq", .name_hash = 0, .fnc = handle_disconnect_req }, diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index ebf428ff9f..7d03f97fd1 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -120,10 +120,10 @@ struct topic_name { { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" }, { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" }, { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" }, - { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" }, - { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" }, + { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log-v2" }, + { .id = ACLK_TOPICID_ALARM_CHECKPOINT, .name = "alarm-checkpoint" }, { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" }, - { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" }, + { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot-v2" }, { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" }, { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" }, { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" }, @@ -146,7 +146,7 @@ enum aclk_topics compulsory_topics[] = { ACLK_TOPICID_RETENTION_UPDATED, ACLK_TOPICID_NODE_INFO, ACLK_TOPICID_ALARM_LOG, - ACLK_TOPICID_ALARM_HEALTH, + ACLK_TOPICID_ALARM_CHECKPOINT, ACLK_TOPICID_ALARM_CONFIG, ACLK_TOPICID_ALARM_SNAPSHOT, ACLK_TOPICID_NODE_COLLECTORS, diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 76dc8cad98..6b7e4e9c2b 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -85,7 +85,7 @@ enum aclk_topics { ACLK_TOPICID_RETENTION_UPDATED = 12, ACLK_TOPICID_NODE_INFO = 13, ACLK_TOPICID_ALARM_LOG = 14, - ACLK_TOPICID_ALARM_HEALTH = 15, + ACLK_TOPICID_ALARM_CHECKPOINT = 15, ACLK_TOPICID_ALARM_CONFIG = 16, ACLK_TOPICID_ALARM_SNAPSHOT = 17, ACLK_TOPICID_NODE_COLLECTORS = 18, diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index f643933002..af0b891ca4 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -21,57 +21,24 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_ return ret; ret.node_id = strdupz(msg.node_id().c_str()); - ret.batch_id = msg.batch_id(); - ret.start_seq_id = msg.start_sequnce_id(); + ret.resets = msg.resets(); return ret; } -char *parse_send_alarm_log_health(const char *data, size_t len) +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len) { - SendAlarmLogHealth msg; + struct send_alarm_checkpoint ret; + memset(&ret, 0, sizeof(ret)); + + SendAlarmCheckpoint msg; if (!msg.ParseFromArray(data, len)) - return NULL; - return strdupz(msg.node_id().c_str()); -} + return ret; -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) -{ - AlarmLogHealth msg; - LogEntries *entries; + ret.node_id = strdupz(msg.node_id().c_str()); + ret.claim_id = strdupz(msg.claim_id().c_str()); - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - msg.set_enabled(data->enabled); - - switch (data->status) { - case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED); - break; - default: - error("Unknown status of AlarmLogHealth LogEntry"); - return NULL; - } - - entries = msg.mutable_log_entries(); - entries->set_first_sequence_id(data->log_entries.first_seq_id); - entries->set_last_sequence_id(data->log_entries.last_seq_id); - - set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when()); - set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when()); - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - if (!msg.SerializeToArray(bin, *len)) - return NULL; - - return bin; + return ret; } static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) @@ -131,8 +98,6 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr if (data->family) proto->set_family(data->family); - proto->set_batch_id(data->batch_id); - proto->set_sequence_id(data->sequence_id); proto->set_when(data->when); proto->set_config_hash(data->config_hash); @@ -187,6 +152,24 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) return bin; } +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data) +{ + AlarmCheckpoint msg; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + msg.set_checksum(data->checksum); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) { + freez(bin); + return NULL; + } + + return bin; +} + struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) { SendAlarmSnapshot msg; @@ -198,8 +181,8 @@ struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t l ret->claim_id = strdupz(msg.claim_id().c_str()); if (msg.node_id().c_str()) ret->node_id = strdupz(msg.node_id().c_str()); - ret->snapshot_id = msg.snapshot_id(); - ret->sequence_id = msg.sequence_id(); + if (msg.snapshot_uuid().c_str()) + ret->snapshot_uuid = strdupz(msg.snapshot_uuid().c_str()); return ret; } @@ -208,6 +191,7 @@ void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr) { freez(ptr->claim_id); freez(ptr->node_id); + freez(ptr->snapshot_uuid); freez(ptr); } @@ -218,7 +202,7 @@ alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot * msg->set_node_id(data->node_id); msg->set_claim_id(data->claim_id); - msg->set_snapshot_id(data->snapshot_id); + msg->set_snapshot_uuid(data->snapshot_uuid); msg->set_chunks(data->chunks); msg->set_chunk(data->chunk); diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h index 63911da3fa..83e7c1bce3 100644 --- a/aclk/schema-wrappers/alarm_stream.h +++ b/aclk/schema-wrappers/alarm_stream.h @@ -11,38 +11,12 @@ extern "C" { #endif -enum alarm_log_status_aclk { - ALARM_LOG_STATUS_UNSPECIFIED = 0, - ALARM_LOG_STATUS_RUNNING = 1, - ALARM_LOG_STATUS_IDLE = 2 -}; - -struct alarm_log_entries { - int64_t first_seq_id; - struct timeval first_when; - - int64_t last_seq_id; - struct timeval last_when; -}; - -struct alarm_log_health { - char *claim_id; - char *node_id; - int enabled; - enum alarm_log_status_aclk status; - struct alarm_log_entries log_entries; -}; - struct start_alarm_streaming { char *node_id; - uint64_t batch_id; - uint64_t start_seq_id; + bool resets; }; struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); -char *parse_send_alarm_log_health(const char *data, size_t len); - -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data); enum aclk_alarm_status { ALARM_STATUS_NULL = 0, @@ -101,17 +75,27 @@ struct alarm_log_entry { char *chart_context; }; +struct send_alarm_checkpoint { + char *node_id; + char *claim_id; +}; + +struct alarm_checkpoint { + char *node_id; + char *claim_id; + char *checksum; +}; + struct send_alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; - uint64_t sequence_id; + char *snapshot_uuid; }; struct alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; + char *snapshot_uuid; uint32_t chunks; uint32_t chunk; }; @@ -125,6 +109,9 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data); struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len); +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data); + alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 8853b2e083..4294f7efe5 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -29,8 +29,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new nodeinstance::create::v1::CreateNodeInstance; if (!strcmp(msgname, "UpdateNodeInfo")) return new nodeinstance::info::v1::UpdateNodeInfo; - if (!strcmp(msgname, "AlarmLogHealth")) - return new alarms::v1::AlarmLogHealth; + if (!strcmp(msgname, "AlarmCheckpoint")) + return new alarms::v1::AlarmCheckpoint; if (!strcmp(msgname, "ProvideAlarmConfiguration")) return new alarms::v1::ProvideAlarmConfiguration; if (!strcmp(msgname, "AlarmSnapshot")) @@ -51,8 +51,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new agent::v1::SendNodeInstances; if (!strcmp(msgname, "StartAlarmStreaming")) return new alarms::v1::StartAlarmStreaming; - if (!strcmp(msgname, "SendAlarmLogHealth")) - return new alarms::v1::SendAlarmLogHealth; + if (!strcmp(msgname, "SendAlarmCheckpoint")) + return new alarms::v1::SendAlarmCheckpoint; if (!strcmp(msgname, "SendAlarmConfiguration")) return new alarms::v1::SendAlarmConfiguration; if (!strcmp(msgname, "SendAlarmSnapshot")) diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 060f29be88..019d9056db 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -367,12 +367,12 @@ static void aclk_synchronization(void *arg __maybe_unused) service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register_job_name(ACLK_DATABASE_NOOP, "noop"); - worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log"); worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup"); worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete"); worker_register_job_name(ACLK_DATABASE_NODE_STATE, "node state"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push"); + worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CHECKPOINT,"alert checkpoint"); worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot"); worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check"); worker_register_job_name(ACLK_DATABASE_TIMER, "timer"); @@ -437,9 +437,6 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_PUSH_ALERT: aclk_push_alert_events_for_all_hosts(); break; - case ACLK_DATABASE_ALARM_HEALTH_LOG: - aclk_push_alarm_health_log(cmd.param[0]); - break; case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:; aclk_push_alert_snapshot_event(cmd.param[0]); break; @@ -605,14 +602,6 @@ void aclk_push_node_alert_snapshot(const char *node_id) } -void aclk_push_node_health_log(const char *node_id) -{ - if (unlikely(!aclk_sync_config.initialized)) - return; - - queue_aclk_sync_cmd(ACLK_DATABASE_ALARM_HEALTH_LOG, strdupz(node_id), NULL); -} - void aclk_push_node_removed_alerts(const char *node_id) { if (unlikely(!aclk_sync_config.initialized)) diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 2ec93acbd0..d555a0cef6 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -41,13 +41,13 @@ static inline int claimed() enum aclk_database_opcode { ACLK_DATABASE_NOOP = 0, - ACLK_DATABASE_ALARM_HEALTH_LOG, ACLK_DATABASE_CLEANUP, ACLK_DATABASE_DELETE_HOST, ACLK_DATABASE_NODE_STATE, ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT_CONFIG, ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, + ACLK_DATABASE_PUSH_ALERT_CHECKPOINT, ACLK_DATABASE_QUEUE_REMOVED_ALERTS, ACLK_DATABASE_TIMER, @@ -72,14 +72,13 @@ struct aclk_database_cmdqueue { struct aclk_sync_host_config { RRDHOST *host; int alert_updates; + int alert_checkpoint_req; + int alert_queue_removed; time_t node_info_send_time; time_t node_collectors_send; char uuid_str[UUID_STR_LEN]; char node_id[UUID_STR_LEN]; - uint64_t alerts_batch_id; // batch id for alerts to use - uint64_t alerts_start_seq_id; // cloud has asked to start streaming from - uint64_t alerts_snapshot_id; // will contain the snapshot_id value if snapshot was requested - uint64_t alerts_ack_sequence_id; // last sequence_id ack'ed from cloud via sendsnapshot message + char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested }; extern sqlite3 *db_meta; diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index c6db23b529..16d7352749 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -97,9 +97,6 @@ int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) if (unlikely(uuid_is_null(ae->config_hash_id))) return 0; - if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) - return 0; - char sql[ACLK_SYNC_QUERY_SIZE]; uuid_t config_hash_id; RRDCALC_STATUS status; @@ -193,10 +190,13 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) } } - sqlite3_stmt *res_alert = NULL; - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + if (is_event_from_alert_variable_config(ae->unique_id, uuid_str)) + return 0; + + sqlite3_stmt *res_alert = NULL; char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); @@ -278,26 +278,6 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); - if (wc->alerts_start_seq_id != 0) { - buffer_sprintf( - sql, - "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64 - "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_cloud_ack is null " - "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64 - " and date_submitted is null", - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id, - wc->uuid_str, - wc->alerts_start_seq_id); - if (unlikely(db_execute(buffer_tostring(sql)))) - error_report("Failed to reset ACLK alert entries"); - buffer_reset(sql); - wc->alerts_start_seq_id = 0; - } - int limit = ACLK_MAX_ALERT_UPDATES; sqlite3_stmt *res = NULL; @@ -359,8 +339,8 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11)); alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL; - alarm_log.batch_id = wc->alerts_batch_id; - alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + //alarm_log.batch_id = wc->alerts_batch_id; + //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0); alarm_log.when = (time_t) sqlite3_column_int64(res, 5); uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str); @@ -445,12 +425,11 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) } else { if (log_first_sequence_id) log_access( - "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, + "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", log_first_sequence_id, - log_last_sequence_id, - wc->alerts_batch_id); + log_last_sequence_id); log_first_sequence_id = 0; log_last_sequence_id = 0; } @@ -494,121 +473,17 @@ void sql_queue_existing_alerts_to_aclk(RRDHOST *host) "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str); + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + if (unlikely(db_execute(buffer_tostring(sql)))) error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host)); + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + buffer_free(sql); rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } -void aclk_send_alarm_health_log(char *node_id) -{ - if (unlikely(!node_id)) - return; - - struct aclk_sync_host_config *wc = NULL; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { - log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id); - return; - } - - log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, rrdhost_hostname(host)); - - aclk_push_node_health_log(node_id); -} - -void aclk_push_alarm_health_log(char *node_id __maybe_unused) -{ -#ifdef ENABLE_ACLK - int rc; - - char *claim_id = get_agent_claimid(); - if (unlikely(!claim_id)) { - freez(node_id); - return; - } - - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) { - log_access("AC [%s (N/A)]: Node id not found", node_id); - freez(claim_id); - freez(node_id); - return; - } - - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; - - int64_t first_sequence = 0; - int64_t last_sequence = 0; - struct timeval first_timestamp; - struct timeval last_timestamp; - - char sql[ACLK_SYNC_QUERY_SIZE]; - - sqlite3_stmt *res = NULL; - - //TODO: make this better: include info from health log too - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, "SELECT MIN(sequence_id), MIN(date_created), " \ - "MAX(sequence_id), MAX(date_created) FROM aclk_alert_%s;", wc->uuid_str); - - rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); - if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to get health log statistics from the database"); - freez(claim_id); - return; - } - - first_timestamp.tv_sec = 0; - first_timestamp.tv_usec = 0; - last_timestamp.tv_sec = 0; - last_timestamp.tv_usec = 0; - - while (sqlite3_step_monitored(res) == SQLITE_ROW) { - first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (int64_t) sqlite3_column_int64(res, 0) : 0; - if (sqlite3_column_bytes(res, 1) > 0) { - first_timestamp.tv_sec = sqlite3_column_int64(res, 1); - } - - last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (int64_t) sqlite3_column_int64(res, 2) : 0; - if (sqlite3_column_bytes(res, 3) > 0) { - last_timestamp.tv_sec = sqlite3_column_int64(res, 3); - } - } - - struct alarm_log_entries log_entries; - log_entries.first_seq_id = first_sequence; - log_entries.first_when = first_timestamp; - log_entries.last_seq_id = last_sequence; - log_entries.last_when = last_timestamp; - - struct alarm_log_health alarm_log; - alarm_log.claim_id = claim_id; - alarm_log.node_id = wc->node_id; - alarm_log.log_entries = log_entries; - alarm_log.status = wc->alert_updates == 0 ? 2 : 1; - alarm_log.enabled = (int)host->health.health_enabled; - - aclk_send_alarm_log_health(&alarm_log); - log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %ld to %ld", wc->node_id, rrdhost_hostname(host), first_sequence, last_sequence); - - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc); - - freez(claim_id); - freez(node_id); - - aclk_alert_reloaded = 1; -#endif -} - void aclk_send_alarm_configuration(char *config_hash) { if (unlikely(!config_hash)) @@ -755,7 +630,7 @@ bind_fail: // Start streaming alerts -void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id) +void aclk_start_alert_streaming(char *node_id, bool resets) { if (unlikely(!node_id)) return; @@ -779,20 +654,22 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start return; } - // TODO: CHECK - if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) + if (resets) { + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); sql_queue_existing_alerts_to_aclk(host); + } else + log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); - log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id); - wc->alerts_batch_id = batch_id; - wc->alerts_start_seq_id = start_seq_id; wc->alert_updates = 1; + wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } #define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \ "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \ - "(SELECT alert_unique_id FROM aclk_alert_%s) ORDER BY unique_id ASC " \ + "(SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ + "ORDER BY unique_id ASC " \ "ON CONFLICT (alert_unique_id) DO NOTHING;" void sql_process_queue_removed_alerts_to_aclk(char *node_id) @@ -814,7 +691,9 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id) } else log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) @@ -831,7 +710,7 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) aclk_push_node_removed_alerts(node_id); } -void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, uint64_t snapshot_id, uint64_t sequence_id) +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { uuid_t node_uuid; if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) @@ -851,19 +730,17 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unus } log_access( - "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, - wc->node_id, + "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", + node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", - snapshot_id, - sequence_id); - if (wc->alerts_snapshot_id == snapshot_id) - return; - __sync_synchronize(); - wc->alerts_snapshot_id = snapshot_id; - wc->alerts_ack_sequence_id = sequence_id; - __sync_synchronize(); + snapshot_uuid); + if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid)) + return; + __sync_synchronize(); + wc->alerts_snapshot_uuid = strdupz(snapshot_uuid); + __sync_synchronize(); - aclk_push_node_alert_snapshot(node_id); + aclk_push_node_alert_snapshot(node_id); } #ifdef ENABLE_ACLK @@ -934,8 +811,6 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark) #endif #define ALARM_EVENTS_PER_CHUNK 10 -#define SQL_ALERT_CLOUD_ACK "UPDATE aclk_alert_%s SET date_cloud_ack=unixepoch() WHERE sequence_id <= %"PRIu64 - void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) { #ifdef ENABLE_ACLK @@ -959,21 +834,14 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) return; } - if (unlikely(!wc->alerts_snapshot_id)) + if (unlikely(!wc->alerts_snapshot_uuid)) return; char *claim_id = get_agent_claimid(); if (unlikely(!claim_id)) return; - log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_id); - - if (wc->alerts_ack_sequence_id) { - char sql[512]; - snprintfz(sql, 511, SQL_ALERT_CLOUD_ACK, wc->uuid_str, wc->alerts_ack_sequence_id); - if (unlikely(db_execute(sql))) - error_report("Failed to set ACLK alert entries cloud ACK status for host %s", rrdhost_hostname(host)); - } + log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid); uint32_t cnt = 0; char uuid_str[UUID_STR_LEN]; @@ -1009,7 +877,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1051,7 +919,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) struct alarm_snapshot alarm_snap; alarm_snap.node_id = wc->node_id; alarm_snap.claim_id = claim_id; - alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid; alarm_snap.chunks = chunks; alarm_snap.chunk = chunk; @@ -1065,7 +933,7 @@ void aclk_push_alert_snapshot_event(char *node_id __maybe_unused) } netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); - wc->alerts_snapshot_id = 0; + wc->alerts_snapshot_uuid = NULL; freez(claim_id); #endif @@ -1093,7 +961,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host) } #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \ - "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_cloud_ack IS NOT NULL), " \ "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \ "FROM aclk_alert_%s WHERE date_submitted IS NULL;" @@ -1106,12 +973,11 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert return 1; proto_alert_status->alert_updates = wc->alert_updates; - proto_alert_status->alerts_batch_id = wc->alerts_batch_id; char sql[ACLK_SYNC_QUERY_SIZE]; sqlite3_stmt *res = NULL; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str, wc->uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -1122,8 +988,7 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert while (sqlite3_step_monitored(res) == SQLITE_ROW) { proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0; proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0; - proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; - proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0; + proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0; } rc = sqlite3_finalize(res); @@ -1132,3 +997,140 @@ int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert return 0; } + +void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused) +{ + if (unlikely(!node_id)) + return; + + struct aclk_sync_host_config *wc = NULL; + RRDHOST *host = find_host_by_node_id(node_id); + + if (unlikely(!host)) + return; + + wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id); + return; + } + + log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host)); + + wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS; +} + +typedef struct active_alerts { + char *name; + char *chart; + RRDCALC_STATUS status; +} active_alerts_t; + +static inline int compare_active_alerts(const void * a, const void * b) { + active_alerts_t *active_alerts_a = (active_alerts_t *)a; + active_alerts_t *active_alerts_b = (active_alerts_t *)b; + + if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) ) + { + return strcmp(active_alerts_a->chart, active_alerts_b->chart); + } + else + return strcmp(active_alerts_a->name, active_alerts_b->name); +} + +void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused) +{ +#ifdef ENABLE_ACLK + struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + if (unlikely(!wc)) { + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host)); + return; + } + + //TODO: make sure all pending events are sent. + if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) { + //postpone checkpoint send + wc->alert_checkpoint_req++; + log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host)); + return; + } + + //TODO: lock rc here, or make sure it's called when health decides + //count them + RRDCALC *rc; + uint32_t cnt = 0; + size_t len = 0; + active_alerts_t *active_alerts = NULL; + + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + + if (cnt) { + active_alerts = callocz(cnt, sizeof(active_alerts_t)); + cnt = 0; + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + if (rc->status == RRDCALC_STATUS_WARNING || + rc->status == RRDCALC_STATUS_CRITICAL) { + + active_alerts[cnt].name = (char *)rrdcalc_name(rc); + len += string_strlen(rc->name); + active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc); + len += string_strlen(rc->chart); + active_alerts[cnt].status = rc->status; + len++; + cnt++; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + } + + BUFFER *alarms_to_hash; + if (cnt) { + qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts); + + alarms_to_hash = buffer_create(len, NULL); + for (uint32_t i=0;i<cnt;i++) { + buffer_strcat(alarms_to_hash, active_alerts[i].name); + buffer_strcat(alarms_to_hash, active_alerts[i].chart); + if (active_alerts[i].status == RRDCALC_STATUS_WARNING) + buffer_strcat(alarms_to_hash, "W"); + else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL) + buffer_strcat(alarms_to_hash, "C"); + } + } else { + alarms_to_hash = buffer_create(1, NULL); + buffer_strcat(alarms_to_hash, ""); + len = 0; + } + + char hash[SHA256_DIGEST_LENGTH + 1]; + if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) { + hash[SHA256_DIGEST_LENGTH] = 0; + + struct alarm_checkpoint alarm_checkpoint; + char *claim_id = get_agent_claimid(); + alarm_checkpoint.claim_id = claim_id; + alarm_checkpoint.node_id = wc->node_id; + alarm_checkpoint.checksum = (char *)hash; + + aclk_send_provide_alarm_checkpoint(&alarm_checkpoint); + log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host)); + } else { + log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host)); + } + wc->alert_checkpoint_req = 0; + buffer_free(alarms_to_hash); +#endif +} diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 875b4e6c50..d7252aad6f 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -5,10 +5,11 @@ extern sqlite3 *db_meta; +#define SEND_REMOVED_AFTER_HEALTH_LOOPS 3 +#define SEND_CHECKPOINT_AFTER_HEALTH_LOOPS 4 + struct proto_alert_status { int alert_updates; - uint64_t alerts_batch_id; - uint64_t last_acked_sequence_id; uint64_t pending_min_sequence_id; uint64_t pending_max_sequence_id; uint64_t last_submitted_sequence_id; @@ -16,16 +17,16 @@ struct proto_alert_status { int aclk_add_alert_event(struct aclk_sync_host_config *wc, struct aclk_database_cmd cmd); void aclk_push_alert_event(struct aclk_sync_host_config *wc); -void aclk_send_alarm_health_log(char *node_id); -void aclk_push_alarm_health_log(char *node_id); void aclk_send_alarm_configuration (char *config_hash); int aclk_push_alert_config_event(char *node_id, char *config_hash); -void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id); +void aclk_start_alert_streaming(char *node_id, bool resets); void sql_queue_removed_alerts_to_aclk(RRDHOST *host); void sql_process_queue_removed_alerts_to_aclk(char *node_id); +void aclk_send_alarm_checkpoint(char *node_id, char *claim_id); +void aclk_push_alarm_checkpoint(RRDHOST *host); void aclk_push_alert_snapshot_event(char *node_id); -void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, char *snapshot_uuid); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); void aclk_push_alert_events_for_all_hosts(void); diff --git a/health/health.c b/health/health.c index 4218ede6fb..5c2b85bc5a 100644 --- a/health/health.c +++ b/health/health.c @@ -347,6 +347,15 @@ static void health_reload_host(RRDHOST *host) { rrdcalctemplate_link_matching_templates_to_rrdset(st); } rrdset_foreach_done(st); + +#ifdef ENABLE_ACLK + if (netdata_cloud_setting) { + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (likely(wc)) { + wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; + } + } +#endif } /** @@ -362,12 +371,6 @@ void health_reload(void) { health_reload_host(host); } dfe_done(host); - -#ifdef ENABLE_ACLK - if (netdata_cloud_setting) { - aclk_alert_reloaded = 1; - } -#endif } // ---------------------------------------------------------------------------- @@ -977,9 +980,7 @@ void *health_main(void *ptr) { rrdcalc_delete_alerts_not_matching_host_labels_from_all_hosts(); unsigned int loop = 0; -#ifdef ENABLE_ACLK - unsigned int marked_aclk_reload_loop = 0; -#endif + while(service_running(SERVICE_HEALTH)) { loop++; debug(D_HEALTH, "Health monitoring iteration no %u started", loop); @@ -1008,11 +1009,6 @@ void *health_main(void *ptr) { } } -#ifdef ENABLE_ACLK - if (aclk_alert_reloaded && !marked_aclk_reload_loop) - marked_aclk_reload_loop = loop; -#endif - worker_is_busy(WORKER_HEALTH_JOB_RRD_LOCK); dfe_start_reentrant(rrdhost_root_index, host) { @@ -1117,7 +1113,7 @@ void *health_main(void *ptr) { rc->value = NAN; #ifdef ENABLE_ACLK - if (netdata_cloud_setting && likely(!aclk_alert_reloaded)) + if (netdata_cloud_setting) sql_queue_alarm_to_aclk(host, ae, 1); #endif } @@ -1488,6 +1484,26 @@ void *health_main(void *ptr) { } break; } +#ifdef ENABLE_ACLK + if (netdata_cloud_setting) { + struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; + if (unlikely(!wc)) { + continue; + } + + if (wc->alert_queue_removed == 1) { + sql_queue_removed_alerts_to_aclk(host); + } else if (wc->alert_queue_removed > 1) { + wc->alert_queue_removed--; + } + + if (wc->alert_checkpoint_req == 1) { + aclk_push_alarm_checkpoint(host); + } else if (wc->alert_checkpoint_req > 1) { + wc->alert_checkpoint_req--; + } + } +#endif } dfe_done(host); @@ -1500,23 +1516,6 @@ void *health_main(void *ptr) { health_alarm_wait_for_execution(ae); } -#ifdef ENABLE_ACLK - if (netdata_cloud_setting && unlikely(aclk_alert_reloaded) && loop > (marked_aclk_reload_loop + 2)) { - dfe_start_reentrant(rrdhost_root_index, host) { - if(unlikely(!service_running(SERVICE_HEALTH))) - break; - - if (unlikely(!host->health.health_enabled)) - continue; - - sql_queue_removed_alerts_to_aclk(host); - } - dfe_done(host); - aclk_alert_reloaded = 0; - marked_aclk_reload_loop = 0; - } -#endif - if(unlikely(!service_running(SERVICE_HEALTH))) break; diff --git a/libnetdata/libnetdata.c b/libnetdata/libnetdata.c index 3029b08d1c..a8f26c33b8 100644 --- a/libnetdata/libnetdata.c +++ b/libnetdata/libnetdata.c @@ -2009,3 +2009,28 @@ void timing_action(TIMING_ACTION action, TIMING_STEP step) { } } } + +int hash256_string(const unsigned char *string, size_t size, char *hash) { + EVP_MD_CTX *ctx; + ctx = EVP_MD_CTX_create(); + + if (!ctx) + return 0; + + if (!EVP_DigestInit(ctx, EVP_sha256())) { + EVP_MD_CTX_destroy(ctx); + return 0; + } + + if (!EVP_DigestUpdate(ctx, string, size)) { + EVP_MD_CTX_destroy(ctx); + return 0; + } + + if (!EVP_DigestFinal(ctx, (unsigned char *)hash, NULL)) { + EVP_MD_CTX_destroy(ctx); + return 0; + } + + return 1; +} diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index 1f164e3a07..c244949308 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -782,6 +782,7 @@ typedef enum { #endif void timing_action(TIMING_ACTION action, TIMING_STEP step); +int hash256_string(const unsigned char *string, size_t size, char *hash); # ifdef __cplusplus } # endif