0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-28 06:32:30 +00:00

ACLK Child Availability Messages ()

* new ACLK messages for Claiming MVP1
This commit is contained in:
Timotej S 2020-11-26 17:26:01 +01:00 committed by GitHub
parent 4f867a58e5
commit f1db235a36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 402 additions and 130 deletions

View file

@ -12,9 +12,6 @@ int aclk_disable_runtime = 0;
int aclk_kill_link = 0;
struct aclk_shared_state aclk_shared_state = {
.metadata_submitted = ACLK_METADATA_REQUIRED,
.agent_state = AGENT_INITIALIZING,
.last_popcorn_interrupt = 0,
.version_neg = 0,
.version_neg_wait_till = 0
};

View file

@ -10,7 +10,7 @@ extern netdata_mutex_t aclk_shared_state_mutex;
// minimum and maximum supported version of ACLK
// in this version of agent
#define ACLK_VERSION_MIN 2
#define ACLK_VERSION_MAX 2
#define ACLK_VERSION_MAX 3
// Version negotiation messages have they own versioning
// this is also used for LWT message as we set that up
@ -26,7 +26,8 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#endif
// Define ACLK Feature Version Boundaries Here
#define ACLK_V_COMPRESSION 2
#define ACLK_V_COMPRESSION 2
#define ACLK_V_CHILDRENSTATE 3
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
@ -35,7 +36,9 @@ typedef enum aclk_cmd {
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
ACLK_CMD_CLOUD_QUERY_2
ACLK_CMD_CLOUD_QUERY_2,
ACLK_CMD_CHILD_CONNECT,
ACLK_CMD_CHILD_DISCONNECT
} ACLK_CMD;
typedef enum aclk_metadata_state {
@ -45,13 +48,32 @@ typedef enum aclk_metadata_state {
} ACLK_METADATA_STATE;
typedef enum aclk_agent_state {
AGENT_INITIALIZING,
AGENT_STABLE
} ACLK_AGENT_STATE;
ACLK_HOST_INITIALIZING,
ACLK_HOST_STABLE
} ACLK_POPCORNING_STATE;
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
#ifdef ENABLE_ACLK
// per child popcorning
ACLK_POPCORNING_STATE state;
ACLK_METADATA_STATE metadata;
time_t timestamp_created;
time_t t_last_popcorn_update;
#endif
} aclk_rrdhost_state;
#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
typedef struct rrdhost RRDHOST;
extern struct aclk_shared_state {
ACLK_METADATA_STATE metadata_submitted;
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
// optimization to avoid looping trough hosts
// every time Query Thread wakes up
RRDHOST *next_popcorn_host;
// read only while ACLK connected
// protect by lock otherwise

View file

@ -512,6 +512,13 @@ cleanup:
return retval;
}
#define ACLK_HOST_PTR_COMPULSORY(x) \
if (unlikely(!host)) { \
errno = 0; \
error(x " needs host pointer"); \
break; \
}
/*
* This function will fetch the next pending command and process it
*
@ -546,25 +553,40 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
switch (this_query->cmd) {
case ACLK_CMD_ONCONNECT:
debug(D_ACLK, "EXECUTING on connect metadata command");
ACLK_SHARED_STATE_LOCK;
meta_state = aclk_shared_state.metadata_submitted;
aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT;
ACLK_SHARED_STATE_UNLOCK;
aclk_send_metadata(meta_state);
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
break;
}
#else
#warning "This check became unnecessary. Remove"
#endif
debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
host->hostname,
host->machine_guid);
rrdhost_aclk_state_lock(host);
meta_state = host->aclk_state.metadata;
host->aclk_state.metadata = ACLK_METADATA_SENT;
rrdhost_aclk_state_unlock(host);
aclk_send_metadata(meta_state, host);
break;
case ACLK_CMD_CHART:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
debug(D_ACLK, "EXECUTING a chart update command");
if (!host)
fatal("Pointer to host compulsory");
aclk_send_single_chart(host->hostname, this_query->query);
aclk_send_single_chart(host, this_query->query);
break;
case ACLK_CMD_CHARTDEL:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
aclk_send_info_metadata(ACLK_METADATA_SENT);
aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@ -581,7 +603,19 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
aclk_execute_query_v2(this_query);
break;
case ACLK_CMD_CHILD_CONNECT:
case ACLK_CMD_CHILD_DISCONNECT:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
debug(
D_ACLK, "Execution Child %s command",
this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
aclk_send_info_child_connection(host, this_query->cmd);
break;
default:
errno = 0;
error("Unknown ACLK Query Command");
break;
}
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
@ -633,6 +667,39 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
}
}
/**
* Checks and updates popcorning state of rrdhost
* returns actual/updated popcorning state
*/
ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
{
rrdhost_aclk_state_lock(host);
ACLK_POPCORNING_STATE ret = host->aclk_state.state;
if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
rrdhost_aclk_state_unlock(host);
return ret;
}
if (!host->aclk_state.t_last_popcorn_update){
rrdhost_aclk_state_unlock(host);
return ret;
}
time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
if (t_diff >= ACLK_STABLE_TIMEOUT) {
host->aclk_state.state = ACLK_HOST_STABLE;
host->aclk_state.t_last_popcorn_update = 0;
rrdhost_aclk_state_unlock(host);
info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
return ACLK_HOST_STABLE;
}
rrdhost_aclk_state_unlock(host);
return ret;
}
/**
* Main query processing thread
*
@ -644,32 +711,14 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
time_t previous_popcorn_interrupt = 0;
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
ACLK_SHARED_STATE_UNLOCK;
break;
}
time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
if (checkpoint > ACLK_STABLE_TIMEOUT) {
aclk_shared_state.agent_state = AGENT_STABLE;
ACLK_SHARED_STATE_UNLOCK;
info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
#ifdef ACLK_DEBUG
_dump_collector_list();
#endif
break;
}
if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) {
info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT);
previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt;
}
ACLK_SHARED_STATE_UNLOCK;
sleep_usec(USEC_PER_SEC * 1);
}
@ -692,15 +741,26 @@ void *aclk_query_main_thread(void *ptr)
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
aclk_set_rx_handlers(aclk_shared_state.version_neg);
}
if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) {
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
ACLK_SHARED_STATE_UNLOCK;
rrdhost_aclk_state_lock(localhost);
if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
ACLK_SHARED_STATE_UNLOCK;
errno = 0;
error("ACLK failed to queue on_connect command");
sleep(1);
continue;
}
aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED;
localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
}
rrdhost_aclk_state_unlock(localhost);
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;

View file

@ -38,13 +38,13 @@ static inline int aclk_v2_payload_get_query(const char *payload, struct aclk_req
return 0;
}
#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\
if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
ACLK_SHARED_STATE_UNLOCK;\
rrdhost_aclk_state_unlock(localhost);\
return 1;\
}\
ACLK_SHARED_STATE_UNLOCK;
rrdhost_aclk_state_unlock(localhost);
/*
* Parse the incoming payload and queue a command if valid

View file

@ -438,23 +438,136 @@ static struct _collector *_add_collector(const char *hostname, const char *plugi
#pragma endregion
#endif
inline static int aclk_popcorn_check_bump()
/* Avoids the need to scan trough all RRDHOSTS
* every time any Query Thread Wakes Up
* (every time we need to check child popcorn expiry)
* call with ACLK_SHARED_STATE_LOCK held
*/
void aclk_update_next_child_to_popcorn(void)
{
RRDHOST *host;
int any = 0;
rrd_rdlock();
rrdhost_foreach_read(host) {
if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
continue;
rrdhost_aclk_state_lock(host);
if (!ACLK_IS_HOST_POPCORNING(host)) {
rrdhost_aclk_state_unlock(host);
continue;
}
any = 1;
if (unlikely(!aclk_shared_state.next_popcorn_host)) {
aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
continue;
}
if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
}
if(!any)
aclk_shared_state.next_popcorn_host = NULL;
rrd_unlock();
}
/* If popcorning bump timer.
* If popcorning or initializing (host not stable) return 1
* Otherwise return 0
*/
static int aclk_popcorn_check_bump(RRDHOST *host)
{
time_t now = now_monotonic_sec();
int updated = 0, ret;
ACLK_SHARED_STATE_LOCK;
rrdhost_aclk_state_lock(host);
ret = ACLK_IS_HOST_INITIALIZING(host);
if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
if(now != host->aclk_state.t_last_popcorn_update) {
updated = 1;
info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
}
host->aclk_state.t_last_popcorn_update = now;
rrdhost_aclk_state_unlock(host);
if (host != localhost && updated)
aclk_update_next_child_to_popcorn();
ACLK_SHARED_STATE_UNLOCK;
return ret;
}
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
return ret;
}
inline static int aclk_host_initializing(RRDHOST *host)
{
rrdhost_aclk_state_lock(host);
int ret = ACLK_IS_HOST_INITIALIZING(host);
rrdhost_aclk_state_unlock(host);
return ret;
}
static void aclk_start_host_popcorning(RRDHOST *host)
{
usec_t now = now_monotonic_sec();
info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
ACLK_SHARED_STATE_LOCK;
rrdhost_aclk_state_lock(host);
if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
errno = 0;
error("Localhost is allowed to do popcorning only once after startup!");
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
return;
}
host->aclk_state.state = ACLK_HOST_INITIALIZING;
host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
host->aclk_state.t_last_popcorn_update = now;
rrdhost_aclk_state_unlock(host);
if (host != localhost)
aclk_update_next_child_to_popcorn();
ACLK_SHARED_STATE_UNLOCK;
}
static void aclk_stop_host_popcorning(RRDHOST *host)
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
rrdhost_aclk_state_lock(host);
if (!ACLK_IS_HOST_POPCORNING(host)) {
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
return 1;
return;
}
info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
host->aclk_state.t_last_popcorn_update = 0;
host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
rrdhost_aclk_state_unlock(host);
if(host == aclk_shared_state.next_popcorn_host) {
aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@ -463,7 +576,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_LOCK;
tmp_collector = _add_collector(hostname, plugin_name, module_name);
tmp_collector = _add_collector(host->hostname, plugin_name, module_name);
if (unlikely(tmp_collector->count != 1)) {
COLLECTOR_UNLOCK;
@ -472,10 +585,10 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_UNLOCK;
if(aclk_popcorn_check_bump())
if(aclk_popcorn_check_bump(host))
return;
if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
@ -487,7 +600,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@ -496,7 +609,7 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
COLLECTOR_LOCK;
tmp_collector = _del_collector(hostname, plugin_name, module_name);
tmp_collector = _del_collector(host->hostname, plugin_name, module_name);
if (unlikely(!tmp_collector || tmp_collector->count)) {
COLLECTOR_UNLOCK;
@ -511,10 +624,10 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha
_free_collector(tmp_collector);
if (aclk_popcorn_check_bump())
if (aclk_popcorn_check_bump(host))
return;
if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
@ -895,6 +1008,7 @@ void *aclk_main(void *ptr)
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_query_threads query_threads;
struct aclk_stats_thread *stats_thread = NULL;
time_t last_periodic_query_wakeup = 0;
query_threads.thread_list = NULL;
@ -941,7 +1055,8 @@ void *aclk_main(void *ptr)
config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
}
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started
//start localhost popcorning
aclk_start_host_popcorning(localhost);
aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
if (aclk_stats_enabled) {
@ -1051,6 +1166,14 @@ void *aclk_main(void *ptr)
if (unlikely(!query_threads.thread_list)) {
aclk_query_threads_start(&query_threads);
}
time_t now = now_monotonic_sec();
if(aclk_connected && last_periodic_query_wakeup < now) {
// to make `aclk_queue_query()` param `run_after` work
// also makes per child popcorning work
last_periodic_query_wakeup = now;
QUERY_THREAD_WAKEUP;
}
} // forever
exited:
// Wakeup query thread to cleanup
@ -1200,9 +1323,9 @@ void aclk_disconnect()
aclk_stats_upd_online(0);
aclk_subscribed = 0;
ACLK_SHARED_STATE_LOCK;
aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED;
ACLK_SHARED_STATE_UNLOCK;
rrdhost_aclk_state_lock(localhost);
localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
rrdhost_aclk_state_unlock(localhost);
aclk_connected = 0;
aclk_connecting = 0;
aclk_force_reconnect = 1;
@ -1294,7 +1417,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
* /api/v1/info
* charts
*/
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@ -1315,11 +1438,11 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
web_client_api_request_v1_info_fill_buffer(host, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
charts2json(localhost, local_buffer, 1, 0);
charts2json(host, local_buffer, 1, 0);
buffer_sprintf(local_buffer, "\n}\n}");
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
@ -1330,6 +1453,66 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
return 0;
}
int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
debug(D_ACLK, "Sending Child Disconnect");
char *msg_id = create_uuid();
aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\"payload\":");
buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
rrdhost_aclk_state_lock(host);
if(host->aclk_state.claimed_id)
buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
else
buffer_strcat(local_buffer, "null}}");
rrdhost_aclk_state_unlock(host);
aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
return 0;
}
void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
{
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
return;
#else
#warning "This check became unnecessary. Remove"
#endif
if (unlikely(aclk_host_initializing(localhost)))
return;
switch (cmd) {
case ACLK_CMD_CHILD_CONNECT:
debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
aclk_start_host_popcorning(host);
aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
break;
case ACLK_CMD_CHILD_DISCONNECT:
debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
aclk_stop_host_popcorning(host);
aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
break;
default:
error("Unknown command for aclk_host_state_update %d.", (int)cmd);
}
}
void aclk_send_stress_test(size_t size)
{
char *buffer = mallocz(size);
@ -1351,11 +1534,12 @@ void aclk_send_stress_test(size_t size)
// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata(ACLK_METADATA_STATE state)
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
aclk_send_info_metadata(state, host);
aclk_send_info_metadata(state);
aclk_send_alarm_metadata(state);
if(host == localhost)
aclk_send_alarm_metadata(state);
return 0;
}
@ -1373,15 +1557,10 @@ void aclk_single_update_enable()
// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
if (unlikely(aclk_host_initializing(localhost)))
return;
}
ACLK_SHARED_STATE_UNLOCK;
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
@ -1390,17 +1569,11 @@ void aclk_alarm_reload()
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
int aclk_send_single_chart(char *hostname, char *chart)
int aclk_send_single_chart(RRDHOST *host, char *chart)
{
RRDHOST *target_host;
target_host = rrdhost_find_by_hostname(hostname, 0);
if (!target_host)
return 1;
RRDSET *st = rrdset_find(target_host, chart);
RRDSET *st = rrdset_find(host, chart);
if (!st)
st = rrdset_find_byname(target_host, chart);
st = rrdset_find_byname(host, chart);
if (!st) {
info("FAILED to find chart %s", chart);
return 1;
@ -1437,13 +1610,16 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (!netdata_cloud_setting)
return 0;
if (host != localhost)
if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
return 0;
if (aclk_host_initializing(localhost))
return 0;
if (unlikely(aclk_disable_single_updates))
return 0;
if (aclk_popcorn_check_bump())
if (aclk_popcorn_check_bump(host))
return 0;
if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
@ -1467,12 +1643,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
if (host != localhost)
return 0;
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
if(unlikely(aclk_host_initializing(localhost)))
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
/*
* Check if individual updates have been disabled

View file

@ -66,24 +66,28 @@ int cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
int aclk_send_metadata(ACLK_METADATA_STATE state);
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted);
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
int aclk_wait_for_initialization();
char *create_publish_base_topic();
int aclk_send_single_chart(char *host, char *chart);
int aclk_send_single_chart(RRDHOST *host, char *chart);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
int aclk_handle_cloud_message(char *payload);
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_alarm_reload();
unsigned long int aclk_reconnect_delay(int mode);
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_single_update_enable();
void aclk_single_update_disable();
void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd);
int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);
void aclk_update_next_child_to_popcorn(void);
#endif //NETDATA_AGENT_CLOUD_LINK_H

View file

@ -34,9 +34,9 @@ static char *claiming_errors[] = {
char *is_agent_claimed()
{
char *result;
netdata_mutex_lock(&localhost->claimed_id_lock);
result = (localhost->claimed_id == NULL) ? NULL : strdupz(localhost->claimed_id);
netdata_mutex_unlock(&localhost->claimed_id_lock);
rrdhost_aclk_state_lock(localhost);
result = (localhost->aclk_state.claimed_id == NULL) ? NULL : strdupz(localhost->aclk_state.claimed_id);
rrdhost_aclk_state_unlock(localhost);
return result;
}
@ -134,10 +134,10 @@ void load_claiming_state(void)
netdata_cloud_setting = 0;
#else
uuid_t uuid;
netdata_mutex_lock(&localhost->claimed_id_lock);
if (localhost->claimed_id) {
freez(localhost->claimed_id);
localhost->claimed_id = NULL;
rrdhost_aclk_state_lock(localhost);
if (localhost->aclk_state.claimed_id) {
freez(localhost->aclk_state.claimed_id);
localhost->aclk_state.claimed_id = NULL;
}
if (aclk_connected)
{
@ -159,8 +159,8 @@ void load_claiming_state(void)
freez(claimed_id);
claimed_id = NULL;
}
localhost->claimed_id = claimed_id;
netdata_mutex_unlock(&localhost->claimed_id_lock);
localhost->aclk_state.claimed_id = claimed_id;
rrdhost_aclk_state_unlock(localhost);
if (!claimed_id) {
info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename);
return;

View file

@ -33,6 +33,7 @@ struct pg_cache_page_index;
#include "rrdcalc.h"
#include "rrdcalctemplate.h"
#include "../streaming/rrdpush.h"
#include "../aclk/aclk_common.h"
struct context_param {
RRDDIM *rd;
@ -825,8 +826,8 @@ struct rrdhost {
struct netdata_ssl stream_ssl; //Structure used to encrypt the stream
#endif
netdata_mutex_t claimed_id_lock;
char *claimed_id; // Claimed ID if host has one otherwise NULL
netdata_mutex_t aclk_state_lock;
aclk_rrdhost_state aclk_state;
struct rrdhost *next;
};
@ -836,6 +837,9 @@ extern RRDHOST *localhost;
#define rrdhost_wrlock(host) netdata_rwlock_wrlock(&((host)->rrdhost_rwlock))
#define rrdhost_unlock(host) netdata_rwlock_unlock(&((host)->rrdhost_rwlock))
#define rrdhost_aclk_state_lock(host) netdata_mutex_lock(&((host)->aclk_state_lock))
#define rrdhost_aclk_state_unlock(host) netdata_mutex_unlock(&((host)->aclk_state_lock))
// ----------------------------------------------------------------------------
// these loop macros make sure the linked list is accessed with the right lock

View file

@ -189,7 +189,7 @@ RRDHOST *rrdhost_create(const char *hostname,
netdata_rwlock_init(&host->rrdhost_rwlock);
netdata_rwlock_init(&host->labels_rwlock);
netdata_mutex_init(&host->claimed_id_lock);
netdata_mutex_init(&host->aclk_state_lock);
host->system_info = system_info;
@ -870,8 +870,8 @@ void rrdhost_free(RRDHOST *host) {
// ------------------------------------------------------------------------
// free it
pthread_mutex_destroy(&host->claimed_id_lock);
freez(host->claimed_id);
pthread_mutex_destroy(&host->aclk_state_lock);
freez(host->aclk_state.claimed_id);
freez((void *)host->tags);
free_host_labels(host->labels);
freez((void *)host->os);

View file

@ -442,7 +442,7 @@ void rrdset_delete_custom(RRDSET *st, int db_rotated) {
recursively_delete_dir(st->cache_dir, "left-over chart");
#ifdef ENABLE_ACLK
if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) {
aclk_del_collector(st->rrdhost->hostname, st->plugin_name, st->module_name);
aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name);
aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHARTDEL);
}
#endif
@ -626,14 +626,14 @@ RRDSET *rrdset_create_custom(
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
if (mark_rebuild & META_CHART_ACTIVATED) {
aclk_add_collector(host->hostname, st->plugin_name, st->module_name);
aclk_add_collector(host, st->plugin_name, st->module_name);
}
else {
if (mark_rebuild & (META_PLUGIN_UPDATED | META_MODULE_UPDATED)) {
aclk_del_collector(
host->hostname, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name,
host, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name,
mark_rebuild & META_MODULE_UPDATED ? old_module : st->module_name);
aclk_add_collector(host->hostname, st->plugin_name, st->module_name);
aclk_add_collector(host, st->plugin_name, st->module_name);
}
}
aclk_update_chart(host, st->id, ACLK_CMD_CHART);
@ -933,7 +933,7 @@ RRDSET *rrdset_create_custom(
rrdhost_unlock(host);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
aclk_add_collector(host->hostname, plugin, module);
aclk_add_collector(host, plugin, module);
aclk_update_chart(host, st->id, ACLK_CMD_CHART);
}
#endif

View file

@ -126,11 +126,11 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK; //the message is OK problem must be somewehere else
}
netdata_mutex_lock(&host->claimed_id_lock);
if (host->claimed_id)
freez(host->claimed_id);
host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
netdata_mutex_unlock(&host->claimed_id_lock);
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
rrdhost_aclk_state_unlock(host);
rrdpush_claimed_id(host);
@ -440,6 +440,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT);
#endif
size_t count = streaming_parser(rpt, &cd, fp);
@ -448,6 +454,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT);
#endif
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
if (!netdata_exit && rpt->host) {
rrd_rdlock();

View file

@ -374,11 +374,11 @@ void rrdpush_claimed_id(RRDHOST *host)
return;
sender_start(host->sender);
netdata_mutex_lock(&host->claimed_id_lock);
rrdhost_aclk_state_lock(host);
buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") );
buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
netdata_mutex_unlock(&host->claimed_id_lock);
rrdhost_aclk_state_unlock(host);
sender_commit(host->sender);
// signal the sender there are more data

View file

@ -841,12 +841,12 @@ static inline void web_client_api_request_v1_info_mirrored_hosts(BUFFER *wb) {
(host->receiver || host == localhost) ? "true" : "false");
netdata_mutex_unlock(&host->receiver_lock);
netdata_mutex_lock(&host->claimed_id_lock);
if (host->claimed_id)
buffer_sprintf(wb, "\"%s\" }", host->claimed_id);
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
buffer_sprintf(wb, "\"%s\" }", host->aclk_state.claimed_id);
else
buffer_strcat(wb, "null }");
netdata_mutex_unlock(&host->claimed_id_lock);
rrdhost_aclk_state_unlock(host);
count++;
}