mirror of
https://github.com/netdata/netdata.git
synced 2025-04-17 11:12:42 +00:00
Delay child disconnect update (#18712)
* Calculate child restart time (start time + shutdown) Assume the child will return at 125% of that time Add timers to handle scheduluing of the node update events to the cloud Handle faster reconnection and reschedule timer to correct state * Remove unnecessary check * No need to get the remaining timer value (function may not be supported) * Switch message to DEBUG
This commit is contained in:
parent
fe8ea63f9e
commit
9bf07d05d0
9 changed files with 89 additions and 17 deletions
src
aclk
database/sqlite
health
plugins.d
streaming
|
@ -804,7 +804,7 @@ void *aclk_main(void *ptr)
|
|||
goto exit_full;
|
||||
|
||||
if (schedule_node_update) {
|
||||
schedule_node_info_update(localhost);
|
||||
schedule_node_state_update(localhost, 0);
|
||||
schedule_node_update = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -206,8 +206,6 @@ fail:
|
|||
static void invalidate_host_last_connected(nd_uuid_t *host_uuid)
|
||||
{
|
||||
sqlite3_stmt *res = NULL;
|
||||
if (!host_uuid)
|
||||
return;
|
||||
|
||||
if (!PREPARE_STATEMENT(db_meta, SQL_INVALIDATE_HOST_LAST_CONNECTED, &res))
|
||||
return;
|
||||
|
@ -358,6 +356,27 @@ static int read_query_thread_count()
|
|||
return threads;
|
||||
}
|
||||
|
||||
static void node_update_timer_cb(uv_timer_t *handle)
|
||||
{
|
||||
struct aclk_sync_cfg_t *ahc = handle->data;
|
||||
RRDHOST *host = ahc->host;
|
||||
|
||||
spinlock_lock(&host->receiver_lock);
|
||||
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
|
||||
spinlock_unlock(&host->receiver_lock);
|
||||
nd_log(NDLS_ACLK, NDLP_DEBUG,"Timer: Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
|
||||
aclk_host_state_update(host, live, 1);
|
||||
}
|
||||
|
||||
static void close_callback(uv_handle_t *handle, void *data __maybe_unused)
|
||||
{
|
||||
if (handle->type == UV_TIMER) {
|
||||
uv_timer_stop((uv_timer_t *)handle);
|
||||
}
|
||||
|
||||
uv_close(handle, NULL); // Automatically close and free the handle
|
||||
}
|
||||
|
||||
static void aclk_synchronization(void *arg)
|
||||
{
|
||||
struct aclk_sync_config_s *config = arg;
|
||||
|
@ -414,11 +433,36 @@ static void aclk_synchronization(void *arg)
|
|||
// NODE STATE
|
||||
case ACLK_DATABASE_NODE_STATE:;
|
||||
RRDHOST *host = cmd.param[0];
|
||||
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
|
||||
struct aclk_sync_cfg_t *ahc = host->aclk_config;
|
||||
if (unlikely(!ahc))
|
||||
create_aclk_config(host, &host->host_id.uuid, &host->node_id.uuid);
|
||||
|
||||
uint64_t schedule_time = (uint64_t)(uintptr_t)cmd.param[1];
|
||||
|
||||
if (!ahc->timer_initialized) {
|
||||
int rc = uv_timer_init(loop, &ahc->timer);
|
||||
if (!rc) {
|
||||
ahc->timer_initialized = true;
|
||||
ahc->timer.data = ahc;
|
||||
}
|
||||
}
|
||||
|
||||
if (ahc->timer_initialized) {
|
||||
if (uv_is_active((uv_handle_t *)&ahc->timer))
|
||||
uv_timer_stop(&ahc->timer);
|
||||
|
||||
ahc->timer.data = ahc;
|
||||
int rc = uv_timer_start(&ahc->timer, node_update_timer_cb, schedule_time, 0);
|
||||
if (!rc)
|
||||
break; // Timer started, exit
|
||||
}
|
||||
|
||||
// This is fallback if timer fails
|
||||
spinlock_lock(&host->receiver_lock);
|
||||
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
|
||||
spinlock_unlock(&host->receiver_lock);
|
||||
aclk_host_state_update(host, live, 1);
|
||||
nd_log(NDLS_ACLK, NDLP_DEBUG,"Sending node update info for %s, LIVE = %d", rrdhost_hostname(host), live);
|
||||
break;
|
||||
case ACLK_DATABASE_NODE_UNREGISTER:
|
||||
sql_unregister_node(cmd.param[0]);
|
||||
|
@ -467,6 +511,11 @@ static void aclk_synchronization(void *arg)
|
|||
uv_close((uv_handle_t *)&config->timer_req, NULL);
|
||||
|
||||
uv_close((uv_handle_t *)&config->async, NULL);
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
|
||||
uv_walk(loop, (uv_walk_cb) close_callback, NULL);
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
|
||||
(void) uv_loop_close(loop);
|
||||
|
||||
worker_unregister();
|
||||
|
@ -581,11 +630,12 @@ void aclk_query_init(mqtt_wss_client client) {
|
|||
queue_aclk_sync_cmd(ACLK_MQTT_WSS_CLIENT, client, NULL);
|
||||
}
|
||||
|
||||
void schedule_node_info_update(RRDHOST *host __maybe_unused)
|
||||
void schedule_node_state_update(RRDHOST *host, uint64_t delay)
|
||||
{
|
||||
if (unlikely(!host))
|
||||
if (unlikely(!aclk_sync_config.initialized || !host))
|
||||
return;
|
||||
queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, NULL);
|
||||
|
||||
queue_aclk_sync_cmd(ACLK_DATABASE_NODE_STATE, host, (void *)(uintptr_t)delay);
|
||||
}
|
||||
|
||||
void unregister_node(const char *machine_guid)
|
||||
|
|
|
@ -39,6 +39,8 @@ struct aclk_database_cmd {
|
|||
|
||||
typedef struct aclk_sync_cfg_t {
|
||||
RRDHOST *host;
|
||||
uv_timer_t timer;
|
||||
bool timer_initialized;
|
||||
int8_t send_snapshot;
|
||||
bool stream_alerts;
|
||||
int alert_count;
|
||||
|
@ -53,7 +55,7 @@ typedef struct aclk_sync_cfg_t {
|
|||
void create_aclk_config(RRDHOST *host, nd_uuid_t *host_uuid, nd_uuid_t *node_id);
|
||||
void sql_aclk_sync_init(void);
|
||||
void aclk_push_alert_config(const char *node_id, const char *config_hash);
|
||||
void schedule_node_info_update(RRDHOST *host);
|
||||
void schedule_node_state_update(RRDHOST *host, uint64_t delay);
|
||||
void unregister_node(const char *machine_guid);
|
||||
|
||||
#endif //NETDATA_SQLITE_ACLK_H
|
||||
|
|
|
@ -75,7 +75,7 @@ static inline int rrdcalc_isrunnable(RRDCALC *rc, time_t now, time_t *next_run)
|
|||
time_t needed = now + rc->config.before + rc->config.after;
|
||||
|
||||
if(needed + update_every < first || needed - update_every > last) {
|
||||
netdata_log_info(
|
||||
netdata_log_debug(D_HEALTH,
|
||||
"Health not examining alarm '%s.%s' yet (not enough data yet - we need %lu but got %lu - %lu).",
|
||||
rrdcalc_chart_name(rc),
|
||||
rrdcalc_name(rc),
|
||||
|
@ -229,7 +229,7 @@ static void health_event_loop(void) {
|
|||
"Postponing alarm checks for %"PRId32" seconds, "
|
||||
"because it seems that the system was just resumed from suspension.",
|
||||
(int32_t)health_globals.config.postpone_alarms_during_hibernation_for_seconds);
|
||||
schedule_node_info_update(localhost);
|
||||
schedule_node_state_update(localhost, 0);
|
||||
}
|
||||
|
||||
if (unlikely(silencers->all_alarms && silencers->stype == STYPE_DISABLE_ALARMS)) {
|
||||
|
|
|
@ -218,7 +218,7 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si
|
|||
|
||||
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
|
||||
rrdcontext_host_child_connected(host);
|
||||
schedule_node_info_update(host);
|
||||
schedule_node_state_update(host, 100);
|
||||
|
||||
return PARSER_RC_OK;
|
||||
}
|
||||
|
|
|
@ -829,8 +829,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
|
|||
|
||||
// in case we have cloud connection we inform cloud
|
||||
// new child connected
|
||||
aclk_host_state_update(rpt->host, 1, 1);
|
||||
|
||||
schedule_node_state_update(rpt->host, 300);
|
||||
rrdhost_set_is_parent_label();
|
||||
|
||||
if (is_ephemeral)
|
||||
|
@ -848,14 +847,14 @@ static void rrdpush_receive(struct receiver_state *rpt)
|
|||
{
|
||||
char msg[100 + 1];
|
||||
snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count);
|
||||
rrdpush_receive_log_status(
|
||||
rpt, msg,
|
||||
RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
|
||||
rrdpush_receive_log_status(rpt, msg, RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
|
||||
}
|
||||
|
||||
// in case we have cloud connection we inform cloud
|
||||
// a child disconnected
|
||||
aclk_host_state_update(rpt->host, 0, 1);
|
||||
STREAM_PATH tmp = rrdhost_stream_path_fetch(rpt->host);
|
||||
uint64_t total_reboot = (tmp.start_time + tmp.shutdown_time);
|
||||
schedule_node_state_update(rpt->host, MIN((total_reboot * MAX_CHILD_DISC_TOLERANCE), MAX_CHILD_DISC_DELAY));
|
||||
|
||||
cleanup:
|
||||
;
|
||||
|
|
|
@ -10,6 +10,11 @@
|
|||
#include "database/rrd.h"
|
||||
#include "stream_capabilities.h"
|
||||
|
||||
// When a child disconnects this is the maximum we will wait
|
||||
// before we update the cloud that the child is offline
|
||||
#define MAX_CHILD_DISC_DELAY (30000)
|
||||
#define MAX_CHILD_DISC_TOLERANCE (125 / 100)
|
||||
|
||||
#define CONNECTED_TO_SIZE 100
|
||||
#define CBUFFER_INITIAL_SIZE (16 * 1024)
|
||||
#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
|
||||
|
|
|
@ -101,6 +101,21 @@ static STREAM_PATH rrdhost_stream_path_self(RRDHOST *host) {
|
|||
return p;
|
||||
}
|
||||
|
||||
STREAM_PATH rrdhost_stream_path_fetch(RRDHOST *host) {
|
||||
STREAM_PATH p = { 0 };
|
||||
|
||||
spinlock_lock(&host->rrdpush.path.spinlock);
|
||||
for (size_t i = 0; i < host->rrdpush.path.used; i++) {
|
||||
STREAM_PATH *tmp_path = &host->rrdpush.path.array[i];
|
||||
if(UUIDeq(host->host_id, tmp_path->host_id)) {
|
||||
p = *tmp_path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
spinlock_unlock(&host->rrdpush.path.spinlock);
|
||||
return p;
|
||||
}
|
||||
|
||||
void rrdhost_stream_path_to_json(BUFFER *wb, struct rrdhost *host, const char *key, bool add_version) {
|
||||
if(add_version)
|
||||
buffer_json_member_add_uint64(wb, "version", 1);
|
||||
|
|
|
@ -47,6 +47,7 @@ void stream_path_node_id_updated(struct rrdhost *host);
|
|||
|
||||
void stream_path_child_disconnected(struct rrdhost *host);
|
||||
void stream_path_parent_disconnected(struct rrdhost *host);
|
||||
STREAM_PATH rrdhost_stream_path_fetch(struct rrdhost *host);
|
||||
|
||||
bool stream_path_set_from_json(struct rrdhost *host, const char *json, bool from_parent);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue