diff --git a/aclk/aclk_common.c b/aclk/aclk_common.c index c949d4c8c5..82c5e68eeb 100644 --- a/aclk/aclk_common.c +++ b/aclk/aclk_common.c @@ -12,9 +12,6 @@ int aclk_disable_runtime = 0; int aclk_kill_link = 0; struct aclk_shared_state aclk_shared_state = { - .metadata_submitted = ACLK_METADATA_REQUIRED, - .agent_state = AGENT_INITIALIZING, - .last_popcorn_interrupt = 0, .version_neg = 0, .version_neg_wait_till = 0 }; diff --git a/aclk/aclk_common.h b/aclk/aclk_common.h index 6c749daff3..015e4b8bbd 100644 --- a/aclk/aclk_common.h +++ b/aclk/aclk_common.h @@ -10,7 +10,7 @@ extern netdata_mutex_t aclk_shared_state_mutex; // minimum and maximum supported version of ACLK // in this version of agent #define ACLK_VERSION_MIN 2 -#define ACLK_VERSION_MAX 2 +#define ACLK_VERSION_MAX 3 // Version negotiation messages have they own versioning // this is also used for LWT message as we set that up @@ -26,7 +26,8 @@ extern netdata_mutex_t aclk_shared_state_mutex; #endif // Define ACLK Feature Version Boundaries Here -#define ACLK_V_COMPRESSION 2 +#define ACLK_V_COMPRESSION 2 +#define ACLK_V_CHILDRENSTATE 3 typedef enum aclk_cmd { ACLK_CMD_CLOUD, @@ -35,7 +36,9 @@ typedef enum aclk_cmd { ACLK_CMD_CHART, ACLK_CMD_CHARTDEL, ACLK_CMD_ALARM, - ACLK_CMD_CLOUD_QUERY_2 + ACLK_CMD_CLOUD_QUERY_2, + ACLK_CMD_CHILD_CONNECT, + ACLK_CMD_CHILD_DISCONNECT } ACLK_CMD; typedef enum aclk_metadata_state { @@ -45,13 +48,32 @@ typedef enum aclk_metadata_state { } ACLK_METADATA_STATE; typedef enum aclk_agent_state { - AGENT_INITIALIZING, - AGENT_STABLE -} ACLK_AGENT_STATE; + ACLK_HOST_INITIALIZING, + ACLK_HOST_STABLE +} ACLK_POPCORNING_STATE; + +typedef struct aclk_rrdhost_state { + char *claimed_id; // Claimed ID if host has one otherwise NULL + +#ifdef ENABLE_ACLK + // per child popcorning + ACLK_POPCORNING_STATE state; + ACLK_METADATA_STATE metadata; + + time_t timestamp_created; + time_t t_last_popcorn_update; +#endif +} aclk_rrdhost_state; + +#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING) +#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update) + +typedef struct rrdhost RRDHOST; + extern struct aclk_shared_state { - ACLK_METADATA_STATE metadata_submitted; - ACLK_AGENT_STATE agent_state; - time_t last_popcorn_interrupt; + // optimization to avoid looping trough hosts + // every time Query Thread wakes up + RRDHOST *next_popcorn_host; // read only while ACLK connected // protect by lock otherwise diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 5e4b879512..be58c2209b 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -512,6 +512,13 @@ cleanup: return retval; } +#define ACLK_HOST_PTR_COMPULSORY(x) \ + if (unlikely(!host)) { \ + errno = 0; \ + error(x " needs host pointer"); \ + break; \ + } + /* * This function will fetch the next pending command and process it * @@ -546,25 +553,40 @@ static int aclk_process_query(struct aclk_query_thread *t_info) switch (this_query->cmd) { case ACLK_CMD_ONCONNECT: - debug(D_ACLK, "EXECUTING on connect metadata command"); - ACLK_SHARED_STATE_LOCK; - meta_state = aclk_shared_state.metadata_submitted; - aclk_shared_state.metadata_submitted = ACLK_METADATA_SENT; - ACLK_SHARED_STATE_UNLOCK; - aclk_send_metadata(meta_state); + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT"); +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) { + error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE); + break; + } +#else +#warning "This check became unnecessary. Remove" +#endif + + debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"", + host->hostname, + host->machine_guid); + + rrdhost_aclk_state_lock(host); + meta_state = host->aclk_state.metadata; + host->aclk_state.metadata = ACLK_METADATA_SENT; + rrdhost_aclk_state_unlock(host); + aclk_send_metadata(meta_state, host); break; case ACLK_CMD_CHART: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART"); + debug(D_ACLK, "EXECUTING a chart update command"); - if (!host) - fatal("Pointer to host compulsory"); - aclk_send_single_chart(host->hostname, this_query->query); + aclk_send_single_chart(host, this_query->query); break; case ACLK_CMD_CHARTDEL: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL"); + debug(D_ACLK, "EXECUTING a chart delete command"); //TODO: This send the info metadata for now - aclk_send_info_metadata(ACLK_METADATA_SENT); + aclk_send_info_metadata(ACLK_METADATA_SENT, host); break; case ACLK_CMD_ALARM: @@ -581,7 +603,19 @@ static int aclk_process_query(struct aclk_query_thread *t_info) aclk_execute_query_v2(this_query); break; + case ACLK_CMD_CHILD_CONNECT: + case ACLK_CMD_CHILD_DISCONNECT: + ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT"); + + debug( + D_ACLK, "Execution Child %s command", + this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect"); + aclk_send_info_child_connection(host, this_query->cmd); + break; + default: + errno = 0; + error("Unknown ACLK Query Command"); break; } debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic); @@ -633,6 +667,39 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) } } +/** + * Checks and updates popcorning state of rrdhost + * returns actual/updated popcorning state + */ + +ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + ACLK_POPCORNING_STATE ret = host->aclk_state.state; + if (host->aclk_state.state != ACLK_HOST_INITIALIZING){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + if (!host->aclk_state.t_last_popcorn_update){ + rrdhost_aclk_state_unlock(host); + return ret; + } + + time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update; + + if (t_diff >= ACLK_STABLE_TIMEOUT) { + host->aclk_state.state = ACLK_HOST_STABLE; + host->aclk_state.t_last_popcorn_update = 0; + rrdhost_aclk_state_unlock(host); + info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff); + return ACLK_HOST_STABLE; + } + + rrdhost_aclk_state_unlock(host); + return ret; +} + /** * Main query processing thread * @@ -644,32 +711,14 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads) void *aclk_query_main_thread(void *ptr) { struct aclk_query_thread *info = ptr; - time_t previous_popcorn_interrupt = 0; while (!netdata_exit) { - ACLK_SHARED_STATE_LOCK; - if (aclk_shared_state.agent_state != AGENT_INITIALIZING) { - ACLK_SHARED_STATE_UNLOCK; - break; - } - - time_t checkpoint = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt; - - if (checkpoint > ACLK_STABLE_TIMEOUT) { - aclk_shared_state.agent_state = AGENT_STABLE; - ACLK_SHARED_STATE_UNLOCK; - info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint); + if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) { #ifdef ACLK_DEBUG _dump_collector_list(); #endif break; } - - if (previous_popcorn_interrupt != aclk_shared_state.last_popcorn_interrupt) { - info("Waiting %ds from this moment for agent collectors to initialize." , ACLK_STABLE_TIMEOUT); - previous_popcorn_interrupt = aclk_shared_state.last_popcorn_interrupt; - } - ACLK_SHARED_STATE_UNLOCK; sleep_usec(USEC_PER_SEC * 1); } @@ -692,15 +741,26 @@ void *aclk_query_main_thread(void *ptr) aclk_shared_state.version_neg = ACLK_VERSION_MIN; aclk_set_rx_handlers(aclk_shared_state.version_neg); } - if (unlikely(aclk_shared_state.metadata_submitted == ACLK_METADATA_REQUIRED)) { - if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + ACLK_SHARED_STATE_UNLOCK; + + rrdhost_aclk_state_lock(localhost); + if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) { + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { ACLK_SHARED_STATE_UNLOCK; errno = 0; error("ACLK failed to queue on_connect command"); sleep(1); continue; } - aclk_shared_state.metadata_submitted = ACLK_METADATA_CMD_QUEUED; + localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED; + } + rrdhost_aclk_state_unlock(localhost); + + ACLK_SHARED_STATE_LOCK; + if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) { + aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT); + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); } ACLK_SHARED_STATE_UNLOCK; diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 85b0d74ac9..b90f60d9a8 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -38,13 +38,13 @@ static inline int aclk_v2_payload_get_query(const char *payload, struct aclk_req return 0; } -#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\ - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\ +#define HTTP_CHECK_AGENT_INITIALIZED() rrdhost_aclk_state_lock(localhost);\ + if (unlikely(localhost->aclk_state.state == ACLK_HOST_INITIALIZING)) {\ debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\ - ACLK_SHARED_STATE_UNLOCK;\ + rrdhost_aclk_state_unlock(localhost);\ return 1;\ }\ - ACLK_SHARED_STATE_UNLOCK; + rrdhost_aclk_state_unlock(localhost); /* * Parse the incoming payload and queue a command if valid diff --git a/aclk/agent_cloud_link.c b/aclk/agent_cloud_link.c index d19ee27fde..3c28e1e98a 100644 --- a/aclk/agent_cloud_link.c +++ b/aclk/agent_cloud_link.c @@ -438,23 +438,136 @@ static struct _collector *_add_collector(const char *hostname, const char *plugi #pragma endregion #endif -inline static int aclk_popcorn_check_bump() +/* Avoids the need to scan trough all RRDHOSTS + * every time any Query Thread Wakes Up + * (every time we need to check child popcorn expiry) + * call with ACLK_SHARED_STATE_LOCK held + */ +void aclk_update_next_child_to_popcorn(void) +{ + RRDHOST *host; + int any = 0; + + rrd_rdlock(); + rrdhost_foreach_read(host) { + if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) + continue; + + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); + continue; + } + + any = 1; + + if (unlikely(!aclk_shared_state.next_popcorn_host)) { + aclk_shared_state.next_popcorn_host = host; + rrdhost_aclk_state_unlock(host); + continue; + } + + if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update) + aclk_shared_state.next_popcorn_host = host; + + rrdhost_aclk_state_unlock(host); + } + if(!any) + aclk_shared_state.next_popcorn_host = NULL; + + rrd_unlock(); +} + +/* If popcorning bump timer. + * If popcorning or initializing (host not stable) return 1 + * Otherwise return 0 + */ +static int aclk_popcorn_check_bump(RRDHOST *host) +{ + time_t now = now_monotonic_sec(); + int updated = 0, ret; + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + + ret = ACLK_IS_HOST_INITIALIZING(host); + if (unlikely(ACLK_IS_HOST_POPCORNING(host))) { + if(now != host->aclk_state.t_last_popcorn_update) { + updated = 1; + info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + } + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + + if (host != localhost && updated) + aclk_update_next_child_to_popcorn(); + + ACLK_SHARED_STATE_UNLOCK; + return ret; + } + + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return ret; +} + +inline static int aclk_host_initializing(RRDHOST *host) +{ + rrdhost_aclk_state_lock(host); + int ret = ACLK_IS_HOST_INITIALIZING(host); + rrdhost_aclk_state_unlock(host); + return ret; +} + +static void aclk_start_host_popcorning(RRDHOST *host) +{ + usec_t now = now_monotonic_sec(); + info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid); + ACLK_SHARED_STATE_LOCK; + rrdhost_aclk_state_lock(host); + if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) { + errno = 0; + error("Localhost is allowed to do popcorning only once after startup!"); + rrdhost_aclk_state_unlock(host); + ACLK_SHARED_STATE_UNLOCK; + return; + } + + host->aclk_state.state = ACLK_HOST_INITIALIZING; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + host->aclk_state.t_last_popcorn_update = now; + rrdhost_aclk_state_unlock(host); + if (host != localhost) + aclk_update_next_child_to_popcorn(); + ACLK_SHARED_STATE_UNLOCK; +} + +static void aclk_stop_host_popcorning(RRDHOST *host) { ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); + rrdhost_aclk_state_lock(host); + if (!ACLK_IS_HOST_POPCORNING(host)) { + rrdhost_aclk_state_unlock(host); ACLK_SHARED_STATE_UNLOCK; - return 1; + return; + } + + info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid); + host->aclk_state.t_last_popcorn_update = 0; + host->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(host); + + if(host == aclk_shared_state.next_popcorn_host) { + aclk_shared_state.next_popcorn_host = NULL; + aclk_update_next_child_to_popcorn(); } ACLK_SHARED_STATE_UNLOCK; - return 0; } /* * Add a new collector to the list * If it exists, update the chart count */ -void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name) +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -463,7 +576,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha COLLECTOR_LOCK; - tmp_collector = _add_collector(hostname, plugin_name, module_name); + tmp_collector = _add_collector(host->hostname, plugin_name, module_name); if (unlikely(tmp_collector->count != 1)) { COLLECTOR_UNLOCK; @@ -472,10 +585,10 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha COLLECTOR_UNLOCK; - if(aclk_popcorn_check_bump()) + if(aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition"); } @@ -487,7 +600,7 @@ void aclk_add_collector(const char *hostname, const char *plugin_name, const cha * This function will release the memory used and schedule * a cloud update */ -void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name) +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name) { struct _collector *tmp_collector; if (unlikely(!netdata_ready)) { @@ -496,7 +609,7 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha COLLECTOR_LOCK; - tmp_collector = _del_collector(hostname, plugin_name, module_name); + tmp_collector = _del_collector(host->hostname, plugin_name, module_name); if (unlikely(!tmp_collector || tmp_collector->count)) { COLLECTOR_UNLOCK; @@ -511,10 +624,10 @@ void aclk_del_collector(const char *hostname, const char *plugin_name, const cha _free_collector(tmp_collector); - if (aclk_popcorn_check_bump()) + if (aclk_popcorn_check_bump(host)) return; - if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) + if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion"); } @@ -895,6 +1008,7 @@ void *aclk_main(void *ptr) struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; struct aclk_query_threads query_threads; struct aclk_stats_thread *stats_thread = NULL; + time_t last_periodic_query_wakeup = 0; query_threads.thread_list = NULL; @@ -941,7 +1055,8 @@ void *aclk_main(void *ptr) config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count); } - aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started + //start localhost popcorning + aclk_start_host_popcorning(localhost); aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES); if (aclk_stats_enabled) { @@ -1051,6 +1166,14 @@ void *aclk_main(void *ptr) if (unlikely(!query_threads.thread_list)) { aclk_query_threads_start(&query_threads); } + + time_t now = now_monotonic_sec(); + if(aclk_connected && last_periodic_query_wakeup < now) { + // to make `aclk_queue_query()` param `run_after` work + // also makes per child popcorning work + last_periodic_query_wakeup = now; + QUERY_THREAD_WAKEUP; + } } // forever exited: // Wakeup query thread to cleanup @@ -1200,9 +1323,9 @@ void aclk_disconnect() aclk_stats_upd_online(0); aclk_subscribed = 0; - ACLK_SHARED_STATE_LOCK; - aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED; - ACLK_SHARED_STATE_UNLOCK; + rrdhost_aclk_state_lock(localhost); + localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED; + rrdhost_aclk_state_unlock(localhost); aclk_connected = 0; aclk_connecting = 0; aclk_force_reconnect = 1; @@ -1294,7 +1417,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted) * /api/v1/info * charts */ -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted) +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host) { BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); @@ -1315,11 +1438,11 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted) buffer_strcat(local_buffer, ",\n\t\"payload\": "); buffer_sprintf(local_buffer, "{\n\t \"info\" : "); - web_client_api_request_v1_info_fill_buffer(localhost, local_buffer); + web_client_api_request_v1_info_fill_buffer(host, local_buffer); debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len); buffer_sprintf(local_buffer, ", \n\t \"charts\" : "); - charts2json(localhost, local_buffer, 1, 0); + charts2json(host, local_buffer, 1, 0); buffer_sprintf(local_buffer, "\n}\n}"); debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len); @@ -1330,6 +1453,66 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted) return 0; } +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd) +{ + BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE); + local_buffer->contenttype = CT_APPLICATION_JSON; + + if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg); + + debug(D_ACLK, "Sending Child Disconnect"); + + char *msg_id = create_uuid(); + + aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg); + + buffer_strcat(local_buffer, ",\"payload\":"); + + buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid); + rrdhost_aclk_state_lock(host); + if(host->aclk_state.claimed_id) + buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id); + else + buffer_strcat(local_buffer, "null}}"); + + rrdhost_aclk_state_unlock(host); + + aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id); + + freez(msg_id); + buffer_free(local_buffer); + return 0; +} + +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd) +{ +#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) + return; +#else +#warning "This check became unnecessary. Remove" +#endif + + if (unlikely(aclk_host_initializing(localhost))) + return; + + switch (cmd) { + case ACLK_CMD_CHILD_CONNECT: + debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid); + aclk_start_host_popcorning(host); + aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT); + break; + case ACLK_CMD_CHILD_DISCONNECT: + debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid); + aclk_stop_host_popcorning(host); + aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT); + break; + default: + error("Unknown command for aclk_host_state_update %d.", (int)cmd); + } +} + void aclk_send_stress_test(size_t size) { char *buffer = mallocz(size); @@ -1351,11 +1534,12 @@ void aclk_send_stress_test(size_t size) // Send info metadata message to the cloud if the link is established // or on request -int aclk_send_metadata(ACLK_METADATA_STATE state) +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host) { + aclk_send_info_metadata(state, host); - aclk_send_info_metadata(state); - aclk_send_alarm_metadata(state); + if(host == localhost) + aclk_send_alarm_metadata(state); return 0; } @@ -1373,15 +1557,10 @@ void aclk_single_update_enable() // Trigged by a health reload, sends the alarm metadata void aclk_alarm_reload() { - - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; + if (unlikely(aclk_host_initializing(localhost))) return; - } - ACLK_SHARED_STATE_UNLOCK; - if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { + if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) { if (likely(aclk_connected)) { errno = 0; error("ACLK failed to queue on_connect command on alarm reload"); @@ -1390,17 +1569,11 @@ void aclk_alarm_reload() } //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf) -int aclk_send_single_chart(char *hostname, char *chart) +int aclk_send_single_chart(RRDHOST *host, char *chart) { - RRDHOST *target_host; - - target_host = rrdhost_find_by_hostname(hostname, 0); - if (!target_host) - return 1; - - RRDSET *st = rrdset_find(target_host, chart); + RRDSET *st = rrdset_find(host, chart); if (!st) - st = rrdset_find_byname(target_host, chart); + st = rrdset_find_byname(host, chart); if (!st) { info("FAILED to find chart %s", chart); return 1; @@ -1437,13 +1610,16 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd) if (!netdata_cloud_setting) return 0; - if (host != localhost) + if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost) + return 0; + + if (aclk_host_initializing(localhost)) return 0; if (unlikely(aclk_disable_single_updates)) return 0; - if (aclk_popcorn_check_bump()) + if (aclk_popcorn_check_bump(host)) return 0; if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) { @@ -1467,12 +1643,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae) if (host != localhost) return 0; - ACLK_SHARED_STATE_LOCK; - if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) { - ACLK_SHARED_STATE_UNLOCK; + if(unlikely(aclk_host_initializing(localhost))) return 0; - } - ACLK_SHARED_STATE_UNLOCK; /* * Check if individual updates have been disabled diff --git a/aclk/agent_cloud_link.h b/aclk/agent_cloud_link.h index b6bb74c2f4..b224a45433 100644 --- a/aclk/agent_cloud_link.h +++ b/aclk/agent_cloud_link.h @@ -66,24 +66,28 @@ int cloud_to_agent_parse(JSON_ENTRY *e); void aclk_disconnect(); void aclk_connect(); -int aclk_send_metadata(ACLK_METADATA_STATE state); -int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted); +int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host); +int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host); void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted); int aclk_wait_for_initialization(); char *create_publish_base_topic(); -int aclk_send_single_chart(char *host, char *chart); +int aclk_send_single_chart(RRDHOST *host, char *chart); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version); int aclk_handle_cloud_message(char *payload); -void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name); -void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name); +void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name); +void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name); void aclk_alarm_reload(); unsigned long int aclk_reconnect_delay(int mode); extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host); void aclk_single_update_enable(); void aclk_single_update_disable(); +void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd); +int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd); +void aclk_update_next_child_to_popcorn(void); + #endif //NETDATA_AGENT_CLOUD_LINK_H diff --git a/claim/claim.c b/claim/claim.c index 7846925e6d..d87f46a7f8 100644 --- a/claim/claim.c +++ b/claim/claim.c @@ -34,9 +34,9 @@ static char *claiming_errors[] = { char *is_agent_claimed() { char *result; - netdata_mutex_lock(&localhost->claimed_id_lock); - result = (localhost->claimed_id == NULL) ? NULL : strdupz(localhost->claimed_id); - netdata_mutex_unlock(&localhost->claimed_id_lock); + rrdhost_aclk_state_lock(localhost); + result = (localhost->aclk_state.claimed_id == NULL) ? NULL : strdupz(localhost->aclk_state.claimed_id); + rrdhost_aclk_state_unlock(localhost); return result; } @@ -134,10 +134,10 @@ void load_claiming_state(void) netdata_cloud_setting = 0; #else uuid_t uuid; - netdata_mutex_lock(&localhost->claimed_id_lock); - if (localhost->claimed_id) { - freez(localhost->claimed_id); - localhost->claimed_id = NULL; + rrdhost_aclk_state_lock(localhost); + if (localhost->aclk_state.claimed_id) { + freez(localhost->aclk_state.claimed_id); + localhost->aclk_state.claimed_id = NULL; } if (aclk_connected) { @@ -159,8 +159,8 @@ void load_claiming_state(void) freez(claimed_id); claimed_id = NULL; } - localhost->claimed_id = claimed_id; - netdata_mutex_unlock(&localhost->claimed_id_lock); + localhost->aclk_state.claimed_id = claimed_id; + rrdhost_aclk_state_unlock(localhost); if (!claimed_id) { info("Unable to load '%s', setting state to AGENT_UNCLAIMED", filename); return; diff --git a/database/rrd.h b/database/rrd.h index c7bdb98e6d..fd4cae81aa 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -33,6 +33,7 @@ struct pg_cache_page_index; #include "rrdcalc.h" #include "rrdcalctemplate.h" #include "../streaming/rrdpush.h" +#include "../aclk/aclk_common.h" struct context_param { RRDDIM *rd; @@ -825,8 +826,8 @@ struct rrdhost { struct netdata_ssl stream_ssl; //Structure used to encrypt the stream #endif - netdata_mutex_t claimed_id_lock; - char *claimed_id; // Claimed ID if host has one otherwise NULL + netdata_mutex_t aclk_state_lock; + aclk_rrdhost_state aclk_state; struct rrdhost *next; }; @@ -836,6 +837,9 @@ extern RRDHOST *localhost; #define rrdhost_wrlock(host) netdata_rwlock_wrlock(&((host)->rrdhost_rwlock)) #define rrdhost_unlock(host) netdata_rwlock_unlock(&((host)->rrdhost_rwlock)) +#define rrdhost_aclk_state_lock(host) netdata_mutex_lock(&((host)->aclk_state_lock)) +#define rrdhost_aclk_state_unlock(host) netdata_mutex_unlock(&((host)->aclk_state_lock)) + // ---------------------------------------------------------------------------- // these loop macros make sure the linked list is accessed with the right lock diff --git a/database/rrdhost.c b/database/rrdhost.c index 54af0390e0..c3940c29b2 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -189,7 +189,7 @@ RRDHOST *rrdhost_create(const char *hostname, netdata_rwlock_init(&host->rrdhost_rwlock); netdata_rwlock_init(&host->labels_rwlock); - netdata_mutex_init(&host->claimed_id_lock); + netdata_mutex_init(&host->aclk_state_lock); host->system_info = system_info; @@ -870,8 +870,8 @@ void rrdhost_free(RRDHOST *host) { // ------------------------------------------------------------------------ // free it - pthread_mutex_destroy(&host->claimed_id_lock); - freez(host->claimed_id); + pthread_mutex_destroy(&host->aclk_state_lock); + freez(host->aclk_state.claimed_id); freez((void *)host->tags); free_host_labels(host->labels); freez((void *)host->os); diff --git a/database/rrdset.c b/database/rrdset.c index 24e5a5d8c2..72f216f2ee 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -442,7 +442,7 @@ void rrdset_delete_custom(RRDSET *st, int db_rotated) { recursively_delete_dir(st->cache_dir, "left-over chart"); #ifdef ENABLE_ACLK if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) { - aclk_del_collector(st->rrdhost->hostname, st->plugin_name, st->module_name); + aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name); aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHARTDEL); } #endif @@ -626,14 +626,14 @@ RRDSET *rrdset_create_custom( #ifdef ENABLE_ACLK if (netdata_cloud_setting) { if (mark_rebuild & META_CHART_ACTIVATED) { - aclk_add_collector(host->hostname, st->plugin_name, st->module_name); + aclk_add_collector(host, st->plugin_name, st->module_name); } else { if (mark_rebuild & (META_PLUGIN_UPDATED | META_MODULE_UPDATED)) { aclk_del_collector( - host->hostname, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name, + host, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name, mark_rebuild & META_MODULE_UPDATED ? old_module : st->module_name); - aclk_add_collector(host->hostname, st->plugin_name, st->module_name); + aclk_add_collector(host, st->plugin_name, st->module_name); } } aclk_update_chart(host, st->id, ACLK_CMD_CHART); @@ -933,7 +933,7 @@ RRDSET *rrdset_create_custom( rrdhost_unlock(host); #ifdef ENABLE_ACLK if (netdata_cloud_setting) { - aclk_add_collector(host->hostname, plugin, module); + aclk_add_collector(host, plugin, module); aclk_update_chart(host, st->id, ACLK_CMD_CHART); } #endif diff --git a/streaming/receiver.c b/streaming/receiver.c index 9165038cda..495a40c017 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -126,11 +126,11 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; //the message is OK problem must be somewehere else } - netdata_mutex_lock(&host->claimed_id_lock); - if (host->claimed_id) - freez(host->claimed_id); - host->claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; - netdata_mutex_unlock(&host->claimed_id_lock); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + freez(host->aclk_state.claimed_id); + host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL; + rrdhost_aclk_state_unlock(host); rrdpush_claimed_id(host); @@ -440,6 +440,12 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // new slave connected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_CONNECT); +#endif size_t count = streaming_parser(rpt, &cd, fp); @@ -448,6 +454,13 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); +#ifdef ENABLE_ACLK + // in case we have cloud connection we inform cloud + // new slave connected + if (netdata_cloud_setting) + aclk_host_state_update(rpt->host, ACLK_CMD_CHILD_DISCONNECT); +#endif + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread if (!netdata_exit && rpt->host) { rrd_rdlock(); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index bdbb2fa1c9..3b813e01fe 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -374,11 +374,11 @@ void rrdpush_claimed_id(RRDHOST *host) return; sender_start(host->sender); - netdata_mutex_lock(&host->claimed_id_lock); + rrdhost_aclk_state_lock(host); - buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->claimed_id ? host->claimed_id : "NULL") ); + buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") ); - netdata_mutex_unlock(&host->claimed_id_lock); + rrdhost_aclk_state_unlock(host); sender_commit(host->sender); // signal the sender there are more data diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index 702778547c..71027552f2 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -841,12 +841,12 @@ static inline void web_client_api_request_v1_info_mirrored_hosts(BUFFER *wb) { (host->receiver || host == localhost) ? "true" : "false"); netdata_mutex_unlock(&host->receiver_lock); - netdata_mutex_lock(&host->claimed_id_lock); - if (host->claimed_id) - buffer_sprintf(wb, "\"%s\" }", host->claimed_id); + rrdhost_aclk_state_lock(host); + if (host->aclk_state.claimed_id) + buffer_sprintf(wb, "\"%s\" }", host->aclk_state.claimed_id); else buffer_strcat(wb, "null }"); - netdata_mutex_unlock(&host->claimed_id_lock); + rrdhost_aclk_state_unlock(host); count++; }