mirror of
https://github.com/netdata/netdata.git
synced 2025-04-17 03:02:41 +00:00
Reduce alert events sent to the cloud. (#12544)
* filter * update filter * queue removed directly * more * logging * cleanup * cleanup 2 * cleanup 3 * finalize instead of reset
This commit is contained in:
parent
90c2fcb838
commit
d6b1756ea7
7 changed files with 147 additions and 15 deletions
|
@ -628,7 +628,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
|
|||
db_execute(buffer_tostring(sql));
|
||||
buffer_flush(sql);
|
||||
|
||||
buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str, uuid_str, uuid_str);
|
||||
buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str);
|
||||
db_execute(buffer_tostring(sql));
|
||||
buffer_flush(sql);
|
||||
|
||||
|
|
|
@ -103,9 +103,7 @@ static inline char *get_str_from_uuid(uuid_t *uuid)
|
|||
|
||||
#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
|
||||
"alert_unique_id, date_created, date_submitted, date_cloud_ack, " \
|
||||
"unique(alert_unique_id)); " \
|
||||
"insert into aclk_alert_%s (alert_unique_id, date_created) " \
|
||||
"select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status <> 0 and new_status <> -2 order by unique_id asc on conflict (alert_unique_id) do nothing;"
|
||||
"unique(alert_unique_id));"
|
||||
|
||||
#define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);"
|
||||
|
||||
|
|
|
@ -8,9 +8,120 @@
|
|||
#include "../../aclk/aclk.h"
|
||||
#endif
|
||||
|
||||
time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
|
||||
sqlite3_stmt *res = NULL;
|
||||
int rc = 0;
|
||||
time_t when = 0;
|
||||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \
|
||||
"and unique_id > %u and unique_id < %u " \
|
||||
"and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id);
|
||||
|
||||
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
|
||||
if (rc != SQLITE_OK) {
|
||||
error_report("Failed to prepare statement when trying to find removed gap.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
rc = sqlite3_step(res);
|
||||
if (likely(rc == SQLITE_ROW)) {
|
||||
when = (time_t) sqlite3_column_int64(res, 0);
|
||||
}
|
||||
|
||||
rc = sqlite3_finalize(res);
|
||||
if (unlikely(rc != SQLITE_OK))
|
||||
error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc);
|
||||
|
||||
return when;
|
||||
}
|
||||
|
||||
#define MAX_REMOVED_PERIOD 900
|
||||
//decide if some events should be sent or not
|
||||
int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
|
||||
{
|
||||
sqlite3_stmt *res = NULL;
|
||||
char uuid_str[GUID_LEN + 1];
|
||||
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
|
||||
int send = 1, rc = 0;
|
||||
|
||||
if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(uuid_is_null(ae->config_hash_id)))
|
||||
return 0;
|
||||
|
||||
char sql[ACLK_SYNC_QUERY_SIZE];
|
||||
uuid_t config_hash_id;
|
||||
RRDCALC_STATUS status;
|
||||
uint32_t unique_id;
|
||||
|
||||
//get the previous sent event of this alarm_id
|
||||
snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
|
||||
where hl.unique_id = aa.alert_unique_id \
|
||||
and hl.alarm_id = %u and hl.unique_id <> %u \
|
||||
order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id);
|
||||
|
||||
rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
|
||||
if (rc != SQLITE_OK) {
|
||||
error_report("Failed to prepare statement when trying to filter alert events.");
|
||||
send = 1;
|
||||
return send;
|
||||
}
|
||||
|
||||
rc = sqlite3_step(res);
|
||||
if (likely(rc == SQLITE_ROW)) {
|
||||
status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
|
||||
if (sqlite3_column_type(res, 1) != SQLITE_NULL)
|
||||
uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
|
||||
unique_id = (uint32_t) sqlite3_column_int64(res, 2);
|
||||
|
||||
} else {
|
||||
send = 1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (ae->new_status != (RRDCALC_STATUS)status) {
|
||||
send = 1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (uuid_compare(ae->config_hash_id, config_hash_id)) {
|
||||
send = 1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
//same status, same config
|
||||
if (ae->new_status == RRDCALC_STATUS_CLEAR) {
|
||||
send = 0;
|
||||
goto done;
|
||||
}
|
||||
|
||||
//detect a long off period of the agent, TODO make global
|
||||
if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) {
|
||||
time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str);
|
||||
|
||||
if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) {
|
||||
send = 1;
|
||||
goto done;
|
||||
} else {
|
||||
send = 0;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
rc = sqlite3_finalize(res);
|
||||
if (unlikely(rc != SQLITE_OK))
|
||||
error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
|
||||
|
||||
return send;
|
||||
}
|
||||
|
||||
// will replace call to aclk_update_alarm in health/health_log.c
|
||||
// and handle both cases
|
||||
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
|
||||
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
|
||||
{
|
||||
//check aclk architecture and handle old json alarm update to cloud
|
||||
//include also the valid statuses for this case
|
||||
|
@ -30,17 +141,18 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
|
|||
if (!claimed())
|
||||
return 0;
|
||||
|
||||
if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
|
||||
return 0;
|
||||
|
||||
if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
|
||||
return 0;
|
||||
|
||||
if (unlikely(!host->dbsync_worker))
|
||||
return 1;
|
||||
|
||||
if (unlikely(uuid_is_null(ae->config_hash_id)))
|
||||
if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!skip_filter) {
|
||||
if (!should_send_to_cloud(host, ae)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int rc = 0;
|
||||
|
||||
|
@ -296,6 +408,22 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
|
|||
return;
|
||||
}
|
||||
|
||||
void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
|
||||
{
|
||||
char uuid_str[GUID_LEN + 1];
|
||||
uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
|
||||
BUFFER *sql = buffer_create(1024);
|
||||
|
||||
buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
|
||||
"select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \
|
||||
"where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
|
||||
"order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str);
|
||||
|
||||
db_execute(buffer_tostring(sql));
|
||||
|
||||
buffer_free(sql);
|
||||
}
|
||||
|
||||
void aclk_send_alarm_health_log(char *node_id)
|
||||
{
|
||||
if (unlikely(!node_id))
|
||||
|
@ -593,6 +721,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
|
|||
log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1))
|
||||
sql_queue_existing_alerts_to_aclk(host);
|
||||
} else
|
||||
wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
|
||||
|
||||
|
@ -644,6 +775,9 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
|
|||
if (unlikely(!host->dbsync_worker))
|
||||
return;
|
||||
|
||||
if (!claimed())
|
||||
return;
|
||||
|
||||
struct aclk_database_cmd cmd;
|
||||
memset(&cmd, 0, sizeof(cmd));
|
||||
cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
|
||||
|
|
|
@ -26,5 +26,6 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
|
|||
void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
|
||||
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status);
|
||||
extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter);
|
||||
|
||||
#endif //NETDATA_SQLITE_ACLK_ALERT_H
|
||||
|
|
|
@ -39,7 +39,6 @@ struct aclk_chart_sync_stats {
|
|||
extern int queue_chart_to_aclk(RRDSET *st);
|
||||
extern int queue_dimension_to_aclk(RRDDIM *rd);
|
||||
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
|
||||
extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae);
|
||||
int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
|
||||
|
|
|
@ -804,7 +804,7 @@ void *health_main(void *ptr) {
|
|||
rc->value = NAN;
|
||||
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
|
||||
if (netdata_cloud_setting && likely(!aclk_alert_reloaded))
|
||||
sql_queue_removed_alerts_to_aclk(host);
|
||||
sql_queue_alarm_to_aclk(host, ae, 1);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,7 +162,7 @@ inline void health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) {
|
|||
|
||||
#ifdef ENABLE_ACLK
|
||||
if (netdata_cloud_setting) {
|
||||
sql_queue_alarm_to_aclk(host, ae);
|
||||
sql_queue_alarm_to_aclk(host, ae, 0);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue