diff --git a/aclk/aclk.c b/aclk/aclk.c index 9b6a8c3a34..a24d258c51 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -183,6 +183,7 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) #define RX_MSGLEN_MAX 4096 static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos) { + UNUSED(qos); char cmsg[RX_MSGLEN_MAX]; size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1); const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND); @@ -227,6 +228,7 @@ static void msg_callback_old_protocol(const char *topic, const void *msg, size_t #ifdef ENABLE_NEW_CLOUD_PROTOCOL static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos) { + UNUSED(qos); if (msglen > RX_MSGLEN_MAX) error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX); @@ -281,7 +283,7 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - error("Got PUBACK for shutdown message. Can exit gracefully."); + info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -314,7 +316,7 @@ static int handle_connection(mqtt_wss_client client) // timeout 1000 to check at least once a second // for netdata_exit if (mqtt_wss_service(client, 1000) < 0){ - error("Connection Error or Dropped"); + error_report("Connection Error or Dropped"); return 1; } @@ -450,7 +452,7 @@ static int wait_popcorning_finishes() void aclk_graceful_disconnect(mqtt_wss_client client) { - error("Preparing to Gracefully Shutdown the ACLK"); + info("Preparing to gracefully shutdown ACLK connection"); aclk_queue_lock(); aclk_queue_flush(); #ifdef ENABLE_NEW_CLOUD_PROTOCOL @@ -467,14 +469,16 @@ void aclk_graceful_disconnect(mqtt_wss_client client) break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - error("MQTT App Layer `disconnect` message sent successfully"); + info("MQTT App Layer `disconnect` message sent successfully"); break; } } + info("ACLK link is down"); + log_access("ACLK DISCONNECTED"); aclk_stats_upd_online(0); aclk_connected = 0; - error("Attempting to Gracefully Shutdown MQTT/WSS connection"); + info("Attempting to gracefully shutdown the MQTT/WSS connection"); mqtt_wss_disconnect(client, 1000); } @@ -720,12 +724,13 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) json_object_put(lwt); if (!ret) { - info("MQTTWSS connection succeeded"); + info("ACLK connection successfully established"); + log_access("ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } - error("Connect failed\n"); + error_report("Connect failed"); } return 1; @@ -821,6 +826,7 @@ void *aclk_main(void *ptr) if (handle_connection(mqttwss_client)) { aclk_stats_upd_online(0); aclk_connected = 0; + log_access("ACLK DISCONNECTED"); } } while (!netdata_exit); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 2b66b51325..001c1ba02f 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -81,6 +81,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) int retval = 0; usec_t t; BUFFER *local_buffer = NULL; + BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE); RRDHOST *query_host = localhost; #ifdef NETDATA_WITH_ZLIB @@ -116,6 +117,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } } + buffer_strcat(log_buffer, query->data.http_api_v2.query); + char *mysep = strchr(query->data.http_api_v2.query, '?'); if (mysep) { url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1); @@ -226,7 +229,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) , dt_usec(&tv, &w->tv_ready) / 1000.0 , dt_usec(&tv, &w->tv_in) / 1000.0 , w->response.code - , strip_control_characters(query->data.http_api_v2.query) + , strip_control_characters((char *)buffer_tostring(log_buffer)) ); cleanup: @@ -240,6 +243,7 @@ cleanup: buffer_free(w->response.header_output); freez(w); buffer_free(local_buffer); + buffer_free(log_buffer); return retval; } diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 079fc16ea0..950eadbf60 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -125,7 +125,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d int rc; if (unlikely(!wc->alert_updates)) { - debug(D_ACLK_SYNC,"Ignoring alert push event, updates have been turned off for node %s", wc->node_id); + log_access("AC [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } @@ -177,6 +177,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d char uuid_str[GUID_LEN + 1]; uint64_t first_sequence_id = 0; uint64_t last_sequence_id = 0; + static __thread uint64_t log_first_sequence_id = 0; + static __thread uint64_t log_last_sequence_id = 0; while (sqlite3_step(res) == SQLITE_ROW) { struct alarm_log_entry alarm_log; @@ -244,7 +246,12 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d if (first_sequence_id == 0) first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + + if (log_first_sequence_id == 0) + log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); + log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0); destroy_alarm_log_entry(&alarm_log); freez(edit_command); @@ -256,6 +263,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", wc->uuid_str, first_sequence_id, last_sequence_id); db_execute(buffer_tostring(sql)); + } else { + if (log_first_sequence_id) + log_access("OG [%s (%s)]: Sent alert events, first sequence_id %"PRIu64", last sequence_id %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", log_first_sequence_id, log_last_sequence_id); + log_first_sequence_id = 0; + log_last_sequence_id = 0; } rc = sqlite3_finalize(res); @@ -274,6 +286,8 @@ void aclk_send_alarm_health_log(char *node_id) if (unlikely(!node_id)) return; + log_access("IN [%s (N/A)]: Request to send alarm health log.", node_id); + struct aclk_database_worker_config *wc = NULL; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); @@ -358,6 +372,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a wc->alert_sequence_id = last_sequence; aclk_send_alarm_log_health(&alarm_log); + log_access("OG [%s (%s)]: Alarm health log sent, first sequence id %ld, last sequence id %ld.", wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence); rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) @@ -381,6 +396,8 @@ void aclk_send_alarm_configuration(char *config_hash) return; } + log_access("IN [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); + struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG; @@ -489,7 +506,7 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct } if (likely(p_alarm_config.cfg_hash)) { - debug(D_ACLK_SYNC, "Sending alert config for %s", config_hash); + log_access("OG [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash); aclk_send_provide_alarm_cfg(&p_alarm_config); freez((char *) cmd.data_param); freez(p_alarm_config.cfg_hash); @@ -516,6 +533,8 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start if (unlikely(!node_id)) return; + log_access("IN [%s (N/A)]: Start streaming alerts with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, batch_id, start_seq_id); + uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) return; @@ -528,12 +547,12 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start rrd_unlock(); if (unlikely(!host->health_enabled)) { - info("Ignoring request to stream alert state changes, health is disabled for %s", host->machine_guid); + log_access("AC [%s (%s)]: Ignoring request to stream alert state changes, health is disabled.", node_id, wc->host ? wc->host->hostname : "N/A"); return; } if (likely(wc)) { - info("START streaming alerts for %s enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64, node_id, batch_id, start_seq_id); + log_access("AC [%s (%s)]: Start streaming alerts enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64".", node_id, wc->host ? wc->host->hostname : "N/A", batch_id, start_seq_id); __sync_synchronize(); wc->alerts_batch_id = batch_id; wc->alerts_start_seq_id = start_seq_id; @@ -541,7 +560,7 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start __sync_synchronize(); } else - error("ACLK synchronization thread is not active for host %s", host->hostname); + log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", node_id, wc->host ? wc->host->hostname : "N/A"); #else UNUSED(node_id); @@ -609,9 +628,10 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn rrd_unlock(); if (likely(wc)) { - info( - "Send alerts snapshot requested for %s with snapshot_id %" PRIu64 " and ack sequence_id %" PRIu64, - node_id, + log_access( + "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64, + wc->node_id, + wc->host ? wc->host->hostname : "N/A", snapshot_id, sequence_id); __sync_synchronize(); @@ -725,7 +745,7 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru UNUSED(cmd); // we perhaps we don't need this for snapshots if (unlikely(!wc->alert_updates)) { - debug(D_ACLK_SYNC, "Ignoring alert push snapshot event, updates have been turned off for node %s", wc->node_id); + log_access("AC [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } @@ -733,6 +753,8 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru if (unlikely(!claim_id)) return; + log_access("OG [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id); + aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); RRDHOST *host = wc->host; diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index dc1b02205f..7c80db09f2 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -302,7 +302,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d wc->chart_pending = 0; if (unlikely(!wc->chart_updates)) { - debug(D_ACLK_SYNC,"Ignoring chart push event, updates have been turned off for node %s", wc->node_id); + log_access("AC [%s (%s)]: Ignoring chart push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } @@ -406,6 +406,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d db_unlock(); aclk_chart_inst_and_dim_update(payload_list, payload_list_size, is_dim, position_list, wc->batch_id); + log_access("OG [%s (%s)]: Sending charts and dimensions update, batch_id %ld, first sequence %ld, last sequence %ld", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->batch_id, first_sequence, last_sequence); wc->chart_sequence_id = last_sequence; wc->chart_timestamp = last_timestamp; } @@ -422,7 +423,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d else { wc->chart_payload_count = sql_get_pending_count(wc); if (!wc->chart_payload_count) - info("%s: sync of charts and dimensions done in %ld seconds", wc->host->hostname, now_realtime_sec() - wc->startup_time); + log_access("AC [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, wc->host ? wc->host->hostname : "N/A", now_realtime_sec() - wc->startup_time); } for (int i = 0; i <= limit; ++i) @@ -495,12 +496,12 @@ int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_d } if (likely(chart_config.config_hash)) { - debug(D_ACLK_SYNC, "Sending chart config for %s", hash_id); + log_access("OG [%s (%s)]: Sending chart config for %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id); aclk_chart_config_updated(&chart_config, 1); destroy_chart_config_updated(&chart_config); } else - info("Chart config for %s not found", hash_id); + log_access("AC [%s (%s)]: Chart config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", hash_id); bind_fail: rc = sqlite3_finalize(res); @@ -518,6 +519,8 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_ int rc; sqlite3_stmt *res = NULL; + log_access("IN [%s (%s)]: Received ack chart sequence id %ld.", wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); + BUFFER *sql = buffer_create(1024); buffer_sprintf(sql, "UPDATE aclk_chart_%s SET date_updated=strftime('%%s','now') WHERE sequence_id <= @sequence_id " @@ -555,7 +558,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl if (cmd.param1 == 1) { db_lock(); buffer_flush(sql); - info("Received full resync for %s", wc->uuid_str); + log_access("IN [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); @@ -583,7 +586,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl rrdhost_unlock(host); } else { - info("Restarting chart sync for %s from sequence=%"PRIu64, wc->uuid_str, cmd.param1); + log_access("AC [%s (%s)]: Restarting chart sync from sequence %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); wc->chart_payload_count = sql_get_pending_count(wc); sql_get_last_chart_sequence(wc); } @@ -608,7 +611,7 @@ void aclk_get_chart_config(char **hash_id) cmd.opcode = ACLK_DATABASE_PUSH_CHART_CONFIG; for (int i = 0; hash_id[i]; ++i) { // TODO: Verify that we have a valid hash_id - debug(D_ACLK_SYNC,"Request %d for chart config with hash [%s] received", i, hash_id[i]); + log_access("IN [%s (%s)]: Request %d for chart config with hash %s received.", wc->node_id, wc->host ? wc->host->hostname : "N/A", i, hash_id[i]); cmd.data_param = (void *)strdupz(hash_id[i]); aclk_database_enq_cmd(wc, &cmd); } @@ -638,7 +641,7 @@ static void aclk_submit_param_command(char *node_id, enum aclk_database_opcode a aclk_database_enq_cmd(wc, &cmd); else { if (aclk_worker_enq_cmd(node_id, &cmd)) - error_report("ACLK synchronization thread is not active for node id %s", node_id); + log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", node_id, host ? host->hostname : "N/A"); } return; } @@ -648,7 +651,7 @@ void aclk_ack_chart_sequence_id(char *node_id, uint64_t last_sequence_id) if (unlikely(!node_id)) return; - debug(D_ACLK_SYNC, "NODE %s reports last sequence id received %"PRIu64, node_id, last_sequence_id); + log_access("AC [%s (N/A)]: Node reports last sequence id received %"PRIu64, node_id, last_sequence_id); aclk_submit_param_command(node_id, ACLK_DATABASE_CHART_ACK, last_sequence_id); return; } @@ -660,8 +663,9 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at if (unlikely(!node_id)) return; - debug(D_ACLK_SYNC,"START streaming charts for node %s from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id, + log_access("IN [%s (N/A)]: Start streaming charts from sequence %"PRIu64" t=%ld, batch=%"PRIu64, node_id, sequence_id, created_at, batch_id); + uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) return; @@ -681,9 +685,9 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at __sync_synchronize(); wc->batch_created = now_realtime_sec(); if (sequence_id > wc->chart_sequence_id || wc->chart_reset_count > 10) { - info("Requesting full resync from the cloud for node id %s " + log_access("AC [%s (%s)]: Requesting full resync from the cloud " "(reset=%d, remote_seq=%"PRIu64", local_seq=%"PRIu64")" - , wc->node_id, wc->chart_reset_count, sequence_id, wc->chart_sequence_id); + , wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_reset_count, sequence_id, wc->chart_sequence_id); chart_reset_t chart_reset; chart_reset.claim_id = is_agent_claimed(); if (chart_reset.claim_id) { @@ -699,8 +703,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at memset(&cmd, 0, sizeof(cmd)); // TODO: handle timestamp if (sequence_id < wc->chart_sequence_id || !sequence_id) { // || created_at != wc->chart_timestamp) { - info("RESET streaming charts for %s from sequence %"PRIu64 \ - " t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id, + log_access("AC [%s (%s)]: Reset streaming charts from sequence %"PRIu64 \ + " t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count); cmd.opcode = ACLK_DATABASE_RESET_CHART; cmd.param1 = sequence_id + 1; @@ -708,8 +712,8 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at aclk_database_enq_cmd(wc, &cmd); } else { - info("START streaming charts for %s enabled -- last streamed sequence %"PRIu64 \ - " t=%ld (reset count=%d)", wc->node_id, wc->chart_sequence_id, + log_access("AC [%s (%s)]: Start streaming charts enabled -- last streamed sequence %"PRIu64 \ + " t=%ld (reset count=%d)", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count); wc->chart_reset_count = 0; wc->chart_updates = 1; @@ -717,7 +721,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at } } else - error("ACLK synchronization thread is not active for host %s", host->hostname); + log_access("AC [%s (%s)]: ACLK synchronization thread is not active.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); return; } host = host->next; diff --git a/database/sqlite/sqlite_aclk_node.c b/database/sqlite/sqlite_aclk_node.c index 3fb90d666c..47b7f0afd3 100644 --- a/database/sqlite/sqlite_aclk_node.c +++ b/database/sqlite/sqlite_aclk_node.c @@ -52,6 +52,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat node_info.data.host_labels_head = labels->head; aclk_update_node_info(&node_info); + log_access("OG [%s (%s)]: Sending node info for guid [%s] (%s).", wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->host_guid, wc->host == localhost ? "parent" : "child"); netdata_rwlock_unlock(&labels->labels_rwlock); rrd_unlock();