diff --git a/CMakeLists.txt b/CMakeLists.txt index aa49294e85..d21ea89c4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1135,6 +1135,7 @@ endfunction() set(ACLK_NG_PROTO_DEFS aclk/aclk-schemas/proto/aclk/v1/lib.proto + aclk/aclk-schemas/proto/agent/v1/disconnect.proto aclk/aclk-schemas/proto/agent/v1/connection.proto aclk/aclk-schemas/proto/alarm/v1/config.proto aclk/aclk-schemas/proto/alarm/v1/stream.proto diff --git a/Makefile.am b/Makefile.am index 0fef853e1b..61213ca8d9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -680,6 +680,7 @@ ACLK_NG_FILES += \ ACLK_NG_PROTO_DEFINITIONS = \ aclk/aclk-schemas/proto/aclk/v1/lib.proto \ + aclk/aclk-schemas/proto/agent/v1/disconnect.proto \ aclk/aclk-schemas/proto/agent/v1/connection.proto \ aclk/aclk-schemas/proto/alarm/v1/config.proto \ aclk/aclk-schemas/proto/alarm/v1/stream.proto \ @@ -710,6 +711,8 @@ ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \ aclk/aclk-schemas/proto/chart/v1/config.pb.h \ aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \ aclk/aclk-schemas/proto/aclk/v1/lib.pb.h \ + aclk/aclk-schemas/proto/agent/v1/disconnect.pb.cc \ + aclk/aclk-schemas/proto/agent/v1/disconnect.pb.h \ aclk/aclk-schemas/proto/alarm/v1/config.pb.cc \ aclk/aclk-schemas/proto/alarm/v1/config.pb.h \ aclk/aclk-schemas/proto/alarm/v1/stream.pb.cc \ @@ -754,6 +757,10 @@ aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \ aclk/aclk-schemas/proto/aclk/v1/lib.pb.h: aclk/aclk-schemas/proto/aclk/v1/lib.proto $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ +aclk/aclk-schemas/proto/agent/v1/disconnect.pb.cc \ +aclk/aclk-schemas/proto/agent/v1/disconnect.pb.h: aclk/aclk-schemas/proto/agent/v1/disconnect.proto + $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ + aclk/aclk-schemas/proto/alarm/v1/config.pb.cc \ aclk/aclk-schemas/proto/alarm/v1/config.pb.h: aclk/aclk-schemas/proto/alarm/v1/config.proto $(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^ diff --git a/aclk/aclk-schemas b/aclk/aclk-schemas index ff110970c0..72d0a600dc 160000 --- a/aclk/aclk-schemas +++ b/aclk/aclk-schemas @@ -1 +1 @@ -Subproject commit ff110970c006170b01b51a15bf6cdc219ce1dcf5 +Subproject commit 72d0a600dccf965b939a1ae4f7818d0ac896842a diff --git a/aclk/aclk.c b/aclk/aclk.c index 191470c938..c7cab3ae06 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -24,6 +24,7 @@ #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est. +int disconnect_req = 0; int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload @@ -218,7 +219,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int error("Received message on unexpected topic %s", topic); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring message."); + error("Link is shutting down. Ignoring incoming message."); return; } @@ -234,7 +235,7 @@ static void msg_callback_new(const char *topic, const void *msg, size_t msglen, debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos); if (aclk_shared_state.mqtt_shutdown_msg_id > 0) { - error("Link is shutting down. Ignoring message."); + error("Link is shutting down. Ignoring incoming message."); return; } @@ -293,6 +294,8 @@ static int read_query_thread_count() return threads; } +void aclk_graceful_disconnect(mqtt_wss_client client); + /* Keeps connection alive and handles all network comms. * Returns on error or when netdata is shutting down. * @param client instance of mqtt_wss_client @@ -310,6 +313,15 @@ static int handle_connection(mqtt_wss_client client) return 1; } + if (disconnect_req) { + disconnect_req = 0; + aclk_graceful_disconnect(client); + aclk_queue_unlock(); + aclk_shared_state.mqtt_shutdown_msg_id = -1; + aclk_shared_state.mqtt_shutdown_msg_rcvd = 0; + return 1; + } + // mqtt_wss_service will return faster than in one second // if there is enough work to do time_t now = now_monotonic_sec(); diff --git a/aclk/aclk.h b/aclk/aclk.h index 8425279077..e5944c04bc 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -12,6 +12,8 @@ extern time_t aclk_block_until; +extern int disconnect_req; + extern aclk_env_t *aclk_env; void *aclk_main(void *ptr); diff --git a/aclk/aclk_query_queue.c b/aclk/aclk_query_queue.c index 152f7539b3..18b4783ee6 100644 --- a/aclk/aclk_query_queue.c +++ b/aclk/aclk_query_queue.c @@ -193,3 +193,10 @@ void aclk_queue_lock(void) aclk_query_queue.block_push = 1; ACLK_QUEUE_UNLOCK; } + +void aclk_queue_unlock(void) +{ + ACLK_QUEUE_LOCK; + aclk_query_queue.block_push = 0; + ACLK_QUEUE_UNLOCK; +} diff --git a/aclk/aclk_query_queue.h b/aclk/aclk_query_queue.h index 46963a878c..db63544330 100644 --- a/aclk/aclk_query_queue.h +++ b/aclk/aclk_query_queue.h @@ -91,6 +91,7 @@ aclk_query_t aclk_queue_pop(void); void aclk_queue_flush(void); void aclk_queue_lock(void); +void aclk_queue_unlock(void); #define QUEUE_IF_PAYLOAD_PRESENT(query) \ if (likely(query->data.bin_payload.payload)) { \ diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 6e85564de5..5ab893578d 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -416,6 +416,24 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t destroy_send_alarm_snapshot(sas); return; } + if (!strcmp(message_type, "DisconnectReq")) { + struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len); + if (!cmd) + return; + if (cmd->permaban) { + error ("Cloud Banned This Agent!"); + aclk_disable_runtime = 1; + } + info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description); + if (cmd->reconnect_after_s > 0) { + aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s; + info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s); + } + disconnect_req = 1; + freez(cmd->error_description); + freez(cmd); + return; + } error ("Unknown new cloud arch message type received \"%s\"", message_type); } #endif diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc index 9dbc0e6b5c..e3bbfe31f3 100644 --- a/aclk/schema-wrappers/connection.cc +++ b/aclk/schema-wrappers/connection.cc @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "proto/agent/v1/connection.pb.h" +#include "proto/agent/v1/disconnect.pb.h" #include "connection.h" #include "schema_wrapper_utils.h" @@ -8,15 +9,17 @@ #include <sys/time.h> #include <stdlib.h> +using namespace agent::v1; + char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data) { - agent::v1::UpdateAgentConnection connupd; + UpdateAgentConnection connupd; connupd.set_claim_id(data->claim_id); connupd.set_reachable(data->reachable); connupd.set_session_id(data->session_id); - connupd.set_update_source((data->lwt) ? agent::v1::CONNECTION_UPDATE_SOURCE_LWT : agent::v1::CONNECTION_UPDATE_SOURCE_AGENT); + connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT); struct timeval tv; gettimeofday(&tv, NULL); @@ -32,3 +35,29 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio return msg; } + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) { + DisconnectReq req; + struct disconnect_cmd *res; + + if (!req.ParseFromArray(data, len)) + return NULL; + + res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd)); + + if (!res) + return NULL; + + res->reconnect_after_s = req.reconnect_after_seconds(); + res->permaban = req.permaban(); + res->error_code = req.error_code(); + if (req.error_description().c_str()) { + res->error_description = strdup(req.error_description().c_str()); + if (!res->error_description) { + free(res); + return NULL; + } + } + + return res; +} diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h index ac661c9542..8c223869a8 100644 --- a/aclk/schema-wrappers/connection.h +++ b/aclk/schema-wrappers/connection.h @@ -27,6 +27,15 @@ typedef struct { char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data); +struct disconnect_cmd { + uint64_t reconnect_after_s; + int permaban; + uint32_t error_code; + char *error_description; +}; + +struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len); + #ifdef __cplusplus } #endif