mirror of
https://github.com/netdata/netdata.git
synced 2025-04-14 01:29:11 +00:00
Implements cloud initiated disconnect command (#11723)
This commit is contained in:
parent
f62198716a
commit
f282633267
10 changed files with 91 additions and 5 deletions
|
@ -1135,6 +1135,7 @@ endfunction()
|
|||
|
||||
set(ACLK_NG_PROTO_DEFS
|
||||
aclk/aclk-schemas/proto/aclk/v1/lib.proto
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.proto
|
||||
aclk/aclk-schemas/proto/agent/v1/connection.proto
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.proto
|
||||
aclk/aclk-schemas/proto/alarm/v1/stream.proto
|
||||
|
|
|
@ -680,6 +680,7 @@ ACLK_NG_FILES += \
|
|||
|
||||
ACLK_NG_PROTO_DEFINITIONS = \
|
||||
aclk/aclk-schemas/proto/aclk/v1/lib.proto \
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.proto \
|
||||
aclk/aclk-schemas/proto/agent/v1/connection.proto \
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.proto \
|
||||
aclk/aclk-schemas/proto/alarm/v1/stream.proto \
|
||||
|
@ -710,6 +711,8 @@ ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
|
|||
aclk/aclk-schemas/proto/chart/v1/config.pb.h \
|
||||
aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \
|
||||
aclk/aclk-schemas/proto/aclk/v1/lib.pb.h \
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.pb.cc \
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.pb.h \
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.pb.cc \
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.pb.h \
|
||||
aclk/aclk-schemas/proto/alarm/v1/stream.pb.cc \
|
||||
|
@ -754,6 +757,10 @@ aclk/aclk-schemas/proto/aclk/v1/lib.pb.cc \
|
|||
aclk/aclk-schemas/proto/aclk/v1/lib.pb.h: aclk/aclk-schemas/proto/aclk/v1/lib.proto
|
||||
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
|
||||
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.pb.cc \
|
||||
aclk/aclk-schemas/proto/agent/v1/disconnect.pb.h: aclk/aclk-schemas/proto/agent/v1/disconnect.proto
|
||||
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
|
||||
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.pb.cc \
|
||||
aclk/aclk-schemas/proto/alarm/v1/config.pb.h: aclk/aclk-schemas/proto/alarm/v1/config.proto
|
||||
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit ff110970c006170b01b51a15bf6cdc219ce1dcf5
|
||||
Subproject commit 72d0a600dccf965b939a1ae4f7818d0ac896842a
|
16
aclk/aclk.c
16
aclk/aclk.c
|
@ -24,6 +24,7 @@
|
|||
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
|
||||
|
||||
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
|
||||
int disconnect_req = 0;
|
||||
|
||||
int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
|
||||
|
||||
|
@ -218,7 +219,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
|
|||
error("Received message on unexpected topic %s", topic);
|
||||
|
||||
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
|
||||
error("Link is shutting down. Ignoring message.");
|
||||
error("Link is shutting down. Ignoring incoming message.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -234,7 +235,7 @@ static void msg_callback_new(const char *topic, const void *msg, size_t msglen,
|
|||
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
|
||||
|
||||
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
|
||||
error("Link is shutting down. Ignoring message.");
|
||||
error("Link is shutting down. Ignoring incoming message.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -293,6 +294,8 @@ static int read_query_thread_count()
|
|||
return threads;
|
||||
}
|
||||
|
||||
void aclk_graceful_disconnect(mqtt_wss_client client);
|
||||
|
||||
/* Keeps connection alive and handles all network comms.
|
||||
* Returns on error or when netdata is shutting down.
|
||||
* @param client instance of mqtt_wss_client
|
||||
|
@ -310,6 +313,15 @@ static int handle_connection(mqtt_wss_client client)
|
|||
return 1;
|
||||
}
|
||||
|
||||
if (disconnect_req) {
|
||||
disconnect_req = 0;
|
||||
aclk_graceful_disconnect(client);
|
||||
aclk_queue_unlock();
|
||||
aclk_shared_state.mqtt_shutdown_msg_id = -1;
|
||||
aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// mqtt_wss_service will return faster than in one second
|
||||
// if there is enough work to do
|
||||
time_t now = now_monotonic_sec();
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
|
||||
extern time_t aclk_block_until;
|
||||
|
||||
extern int disconnect_req;
|
||||
|
||||
extern aclk_env_t *aclk_env;
|
||||
|
||||
void *aclk_main(void *ptr);
|
||||
|
|
|
@ -193,3 +193,10 @@ void aclk_queue_lock(void)
|
|||
aclk_query_queue.block_push = 1;
|
||||
ACLK_QUEUE_UNLOCK;
|
||||
}
|
||||
|
||||
void aclk_queue_unlock(void)
|
||||
{
|
||||
ACLK_QUEUE_LOCK;
|
||||
aclk_query_queue.block_push = 0;
|
||||
ACLK_QUEUE_UNLOCK;
|
||||
}
|
||||
|
|
|
@ -91,6 +91,7 @@ aclk_query_t aclk_queue_pop(void);
|
|||
void aclk_queue_flush(void);
|
||||
|
||||
void aclk_queue_lock(void);
|
||||
void aclk_queue_unlock(void);
|
||||
|
||||
#define QUEUE_IF_PAYLOAD_PRESENT(query) \
|
||||
if (likely(query->data.bin_payload.payload)) { \
|
||||
|
|
|
@ -416,6 +416,24 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
|
|||
destroy_send_alarm_snapshot(sas);
|
||||
return;
|
||||
}
|
||||
if (!strcmp(message_type, "DisconnectReq")) {
|
||||
struct disconnect_cmd *cmd = parse_disconnect_cmd(msg, msg_len);
|
||||
if (!cmd)
|
||||
return;
|
||||
if (cmd->permaban) {
|
||||
error ("Cloud Banned This Agent!");
|
||||
aclk_disable_runtime = 1;
|
||||
}
|
||||
info ("Cloud requested disconnect (EC=%u, \"%s\")", (unsigned int)cmd->error_code, cmd->error_description);
|
||||
if (cmd->reconnect_after_s > 0) {
|
||||
aclk_block_until = now_monotonic_sec() + cmd->reconnect_after_s;
|
||||
info ("Cloud asks not to reconnect for %u seconds. We shall honor that request", (unsigned int)cmd->reconnect_after_s);
|
||||
}
|
||||
disconnect_req = 1;
|
||||
freez(cmd->error_description);
|
||||
freez(cmd);
|
||||
return;
|
||||
}
|
||||
error ("Unknown new cloud arch message type received \"%s\"", message_type);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "proto/agent/v1/connection.pb.h"
|
||||
#include "proto/agent/v1/disconnect.pb.h"
|
||||
#include "connection.h"
|
||||
|
||||
#include "schema_wrapper_utils.h"
|
||||
|
@ -8,15 +9,17 @@
|
|||
#include <sys/time.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
using namespace agent::v1;
|
||||
|
||||
char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data)
|
||||
{
|
||||
agent::v1::UpdateAgentConnection connupd;
|
||||
UpdateAgentConnection connupd;
|
||||
|
||||
connupd.set_claim_id(data->claim_id);
|
||||
connupd.set_reachable(data->reachable);
|
||||
connupd.set_session_id(data->session_id);
|
||||
|
||||
connupd.set_update_source((data->lwt) ? agent::v1::CONNECTION_UPDATE_SOURCE_LWT : agent::v1::CONNECTION_UPDATE_SOURCE_AGENT);
|
||||
connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT);
|
||||
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
|
@ -32,3 +35,29 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
|
|||
|
||||
return msg;
|
||||
}
|
||||
|
||||
struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
|
||||
DisconnectReq req;
|
||||
struct disconnect_cmd *res;
|
||||
|
||||
if (!req.ParseFromArray(data, len))
|
||||
return NULL;
|
||||
|
||||
res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd));
|
||||
|
||||
if (!res)
|
||||
return NULL;
|
||||
|
||||
res->reconnect_after_s = req.reconnect_after_seconds();
|
||||
res->permaban = req.permaban();
|
||||
res->error_code = req.error_code();
|
||||
if (req.error_description().c_str()) {
|
||||
res->error_description = strdup(req.error_description().c_str());
|
||||
if (!res->error_description) {
|
||||
free(res);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,15 @@ typedef struct {
|
|||
|
||||
char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data);
|
||||
|
||||
struct disconnect_cmd {
|
||||
uint64_t reconnect_after_s;
|
||||
int permaban;
|
||||
uint32_t error_code;
|
||||
char *error_description;
|
||||
};
|
||||
|
||||
struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
Loading…
Add table
Reference in a new issue