0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-26 13:54:48 +00:00

Add some logging for cloud new architecture to access.log ()

* add some logging for ng arch to access.log

* change arrows to IN, OG, AC

* log also the params for aclk requests

* check for wc->host before using wc->host->hostname

* turn two messages to info

* reduce alert event logs

* used thread local variables
This commit is contained in:
Emmanuel Vasilakis 2021-11-18 11:56:49 +02:00 committed by GitHub
parent 33d3f6f06b
commit dc42e45c6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 34 deletions

View file

@ -183,6 +183,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
#define RX_MSGLEN_MAX 4096
static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
@ -227,6 +228,7 @@ static void msg_callback_old_protocol(const char *topic, const void *msg, size_t
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
if (msglen > RX_MSGLEN_MAX)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@ -281,7 +283,7 @@ static void puback_callback(uint16_t packet_id)
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
error("Got PUBACK for shutdown message. Can exit gracefully.");
info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
@ -314,7 +316,7 @@ static int handle_connection(mqtt_wss_client client)
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
error("Connection Error or Dropped");
error_report("Connection Error or Dropped");
return 1;
}
@ -450,7 +452,7 @@ static int wait_popcorning_finishes()
void aclk_graceful_disconnect(mqtt_wss_client client)
{
error("Preparing to Gracefully Shutdown the ACLK");
info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
@ -467,14 +469,16 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
error("MQTT App Layer `disconnect` message sent successfully");
info("MQTT App Layer `disconnect` message sent successfully");
break;
}
}
info("ACLK link is down");
log_access("ACLK DISCONNECTED");
aclk_stats_upd_online(0);
aclk_connected = 0;
error("Attempting to Gracefully Shutdown MQTT/WSS connection");
info("Attempting to gracefully shutdown the MQTT/WSS connection");
mqtt_wss_disconnect(client, 1000);
}
@ -720,12 +724,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
json_object_put(lwt);
if (!ret) {
info("MQTTWSS connection succeeded");
info("ACLK connection successfully established");
log_access("ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
}
error("Connect failed\n");
error_report("Connect failed");
}
return 1;
@ -821,6 +826,7 @@ void *aclk_main(void *ptr)
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
aclk_connected = 0;
log_access("ACLK DISCONNECTED");
}
} while (!netdata_exit);

View file

@ -81,6 +81,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE);
RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
@ -116,6 +117,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
}
buffer_strcat(log_buffer, query->data.http_api_v2.query);
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@ -226,7 +229,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
, dt_usec(&tv, &w->tv_ready) / 1000.0
, dt_usec(&tv, &w->tv_in) / 1000.0
, w->response.code
, strip_control_characters(query->data.http_api_v2.query)
, strip_control_characters((char *)buffer_tostring(log_buffer))
);
cleanup:
@ -240,6 +243,7 @@ cleanup:
buffer_free(w->response.header_output);
freez(w);
buffer_free(local_buffer);
buffer_free(log_buffer);
return retval;
}

View file

@ -125,7 +125,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
int rc;
if (unlikely(!wc->alert_updates)) {
debug(D_ACLK_SYNC,"Ignoring alert push event, updates have been turned off for node %s", wc->node_id);
log_access("AC [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@ -177,6 +177,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
char uuid_str[GUID_LEN + 1];
uint64_t first_sequence_id = 0;
uint64_t last_sequence_id = 0;
static __thread uint64_t log_first_sequence_id = 0;
static __thread uint64_t log_last_sequence_id = 0;
while (sqlite3_step(res) == SQLITE_ROW) {
struct alarm_log_entry alarm_log;
@ -244,7 +246,12 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
if (first_sequence_id == 0)
first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
if (log_first_sequence_id == 0)
log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
destroy_alarm_log_entry(&alarm_log);
freez(edit_command);
@ -256,6 +263,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
"WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
wc->uuid_str, first_sequence_id, last_sequence_id);
db_execute(buffer_tostring(sql));
} else {
if (log_first_sequence_id)
log_access("OG [%s (%s)]: Sent alert events, first sequence_id %"PRIu64", last sequence_id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", log_first_sequence_id, log_last_sequence_id);
log_first_sequence_id = 0;
log_last_sequence_id = 0;
}
rc = sqlite3_finalize(res);
@ -274,6 +286,8 @@ void aclk_send_alarm_health_log(char *node_id)
if (unlikely(!node_id))
return;
log_access("IN [%s (N/A)]: Request to send alarm health log.", node_id);
struct aclk_database_worker_config *wc = NULL;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@ -358,6 +372,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
wc->alert_sequence_id = last_sequence;
aclk_send_alarm_log_health(&alarm_log);
log_access("OG [%s (%s)]: Alarm health log sent, first sequence id %ld, last sequence id %ld.", wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
@ -381,6 +396,8 @@ void aclk_send_alarm_configuration(char *config_hash)
return;
}
log_access("IN [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
@ -489,7 +506,7 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
}
if (likely(p_alarm_config.cfg_hash)) {
debug(D_ACLK_SYNC, "Sending alert config for %s", config_hash);
log_access("OG [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
aclk_send_provide_alarm_cfg(&p_alarm_config);
freez((char *) cmd.data_param);
freez(p_alarm_config.cfg_hash);
@ -516,6 +533,8 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
if (unlikely(!node_id))
return;
log_access("IN [%s (N/A)]: Start streaming alerts with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, batch_id, start_seq_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
@ -528,12 +547,12 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
rrd_unlock();
if (unlikely(!host->health_enabled)) {
info("Ignoring request to stream alert state changes, health is disabled for %s", host->machine_guid);
log_access("AC [%s (%s)]: Ignoring request to stream alert state changes, health is disabled.", node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
if (likely(wc)) {
info("START streaming alerts for %s enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64, node_id, batch_id, start_seq_id);
log_access("AC [%s (%s)]: Start streaming alerts enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, wc->host ? wc->host->hostname : "N/A", batch_id, start_seq_id);
__sync_synchronize();
wc->alerts_batch_id = batch_id;
wc->alerts_start_seq_id = start_seq_id;
@ -541,7 +560,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
__sync_synchronize();
}
else
error("ACLK synchronization thread is not active for host %s", host->hostname);
log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", node_id, wc->host ? wc->host->hostname : "N/A");
#else
UNUSED(node_id);
@ -609,9 +628,10 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
rrd_unlock();
if (likely(wc)) {
info(
"Send alerts snapshot requested for %s with snapshot_id %" PRIu64 " and ack sequence_id %" PRIu64,
node_id,
log_access(
"IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
wc->node_id,
wc->host ? wc->host->hostname : "N/A",
snapshot_id,
sequence_id);
__sync_synchronize();
@ -725,7 +745,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
UNUSED(cmd);
// we perhaps we don't need this for snapshots
if (unlikely(!wc->alert_updates)) {
debug(D_ACLK_SYNC, "Ignoring alert push snapshot event, updates have been turned off for node %s", wc->node_id);
log_access("AC [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@ -733,6 +753,8 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
if (unlikely(!claim_id))
return;
log_access("OG [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
RRDHOST *host = wc->host;

View file

@ -302,7 +302,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
wc->chart_pending = 0;
if (unlikely(!wc->chart_updates)) {
debug(D_ACLK_SYNC,"Ignoring chart push event, updates have been turned off for node %s", wc->node_id);
log_access("AC [%s (%s)]: Ignoring chart push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
@ -406,6 +406,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
db_unlock();
aclk_chart_inst_and_dim_update(payload_list, payload_list_size, is_dim, position_list, wc->batch_id);
log_access("OG [%s (%s)]: Sending charts and dimensions update, batch_id %ld, first sequence %ld, last sequence %ld", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->batch_id, first_sequence, last_sequence);
wc->chart_sequence_id = last_sequence;
wc->chart_timestamp = last_timestamp;
}
@ -422,7 +423,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d
else {
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
info("%s: sync of charts and dimensions done in %ld seconds", wc->host->hostname, now_realtime_sec() - wc->startup_time);
log_access("AC [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, wc->host ? wc->host->hostname : "N/A", now_realtime_sec() - wc->startup_time);
}
for (int i = 0; i <= limit; ++i)
@ -495,12 +496,12 @@ int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_d
}
if (likely(chart_config.config_hash)) {
debug(D_ACLK_SYNC, "Sending chart config for %s", hash_id);
log_access("OG [%s (%s)]: Sending chart config for %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id);
aclk_chart_config_updated(&chart_config, 1);
destroy_chart_config_updated(&chart_config);
}
else
info("Chart config for %s not found", hash_id);
log_access("AC [%s (%s)]: Chart config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id);
bind_fail:
rc = sqlite3_finalize(res);
@ -518,6 +519,8 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
int rc;
sqlite3_stmt *res = NULL;
log_access("IN [%s (%s)]: Received ack chart sequence id %ld.", wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1);
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql, "UPDATE aclk_chart_%s SET date_updated=strftime('%%s','now') WHERE sequence_id <= @sequence_id "
@ -555,7 +558,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
if (cmd.param1 == 1) {
db_lock();
buffer_flush(sql);
info("Received full resync for %s", wc->uuid_str);
log_access("IN [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \
"DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
@ -583,7 +586,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl
rrdhost_unlock(host);
}
else {
info("Restarting chart sync for %s from sequence=%"PRIu64, wc->uuid_str, cmd.param1);
log_access("AC [%s (%s)]: Restarting chart sync from sequence %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1);
wc->chart_payload_count = sql_get_pending_count(wc);
sql_get_last_chart_sequence(wc);
}
@ -608,7 +611,7 @@ void aclk_get_chart_config(char **hash_id)
cmd.opcode = ACLK_DATABASE_PUSH_CHART_CONFIG;
for (int i = 0; hash_id[i]; ++i) {
// TODO: Verify that we have a valid hash_id
debug(D_ACLK_SYNC,"Request %d for chart config with hash [%s] received", i, hash_id[i]);
log_access("IN [%s (%s)]: Request %d for chart config with hash %s received.", wc->node_id, wc->host ? wc->host->hostname : "N/A", i, hash_id[i]);
cmd.data_param = (void *)strdupz(hash_id[i]);
aclk_database_enq_cmd(wc, &cmd);
}
@ -638,7 +641,7 @@ static void aclk_submit_param_command(char *node_id, enum aclk_database_opcode a
aclk_database_enq_cmd(wc, &cmd);
else {
if (aclk_worker_enq_cmd(node_id, &cmd))
error_report("ACLK synchronization thread is not active for node id %s", node_id);
log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", node_id, host ? host->hostname : "N/A");
}
return;
}
@ -648,7 +651,7 @@ void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id)
if (unlikely(!node_id))
return;
debug(D_ACLK_SYNC, "NODE %s reports last sequence id received %"PRIu64, node_id, last_sequence_id);
log_access("AC [%s (N/A)]: Node reports last sequence id received %"PRIu64, node_id, last_sequence_id);
aclk_submit_param_command(node_id, ACLK_DATABASE_CHART_ACK, last_sequence_id);
return;
}
@ -660,8 +663,9 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
if (unlikely(!node_id))
return;
debug(D_ACLK_SYNC,"START streaming charts for node %s from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id,
log_access("IN [%s (N/A)]: Start streaming charts from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id,
sequence_id, created_at, batch_id);
uuid_t node_uuid;
if (uuid_parse(node_id, node_uuid))
return;
@ -681,9 +685,9 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
__sync_synchronize();
wc->batch_created = now_realtime_sec();
if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) {
info("Requesting full resync from the cloud for node id %s "
log_access("AC [%s (%s)]: Requesting full resync from the cloud "
"(reset=%d, remote_seq=%"PRIu64", local_seq=%"PRIu64")"
, wc->node_id, wc->chart_reset_count, sequence_id, wc->chart_sequence_id);
, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_reset_count, sequence_id, wc->chart_sequence_id);
chart_reset_t chart_reset;
chart_reset.claim_id = is_agent_claimed();
if (chart_reset.claim_id) {
@ -699,8 +703,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
memset(&cmd, 0, sizeof(cmd));
// TODO: handle timestamp
if (sequence_id < wc->chart_sequence_id || !sequence_id) { // || created_at != wc->chart_timestamp) {
info("RESET streaming charts for %s from sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id,
log_access("AC [%s (%s)]: Reset streaming charts from sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
cmd.opcode = ACLK_DATABASE_RESET_CHART;
cmd.param1 = sequence_id + 1;
@ -708,8 +712,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
aclk_database_enq_cmd(wc, &cmd);
}
else {
info("START streaming charts for %s enabled -- last streamed sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id,
log_access("AC [%s (%s)]: Start streaming charts enabled -- last streamed sequence %"PRIu64 \
" t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id,
wc->chart_timestamp, wc->chart_reset_count);
wc->chart_reset_count = 0;
wc->chart_updates = 1;
@ -717,7 +721,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
}
}
else
error("ACLK synchronization thread is not active for host %s", host->hostname);
log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
return;
}
host = host->next;

View file

@ -52,6 +52,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
node_info.data.host_labels_head = labels->head;
aclk_update_node_info(&node_info);
log_access("OG [%s (%s)]: Sending node info for guid [%s] (%s).", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->host_guid, wc->host == localhost ? "parent" : "child");
netdata_rwlock_unlock(&labels->labels_rwlock);
rrd_unlock();