mirror of
https://github.com/netdata/netdata.git
synced 2025-05-11 20:25:51 +00:00

* cleanup of logging - wip
* first working iteration
* add errno annotator
* replace old logging functions with netdata_logger()
* cleanup
* update error_limit
* fix remanining error_limit references
* work on fatal()
* started working on structured logs
* full cleanup
* default logging to files; fix all plugins initialization
* fix formatting of numbers
* cleanup and reorg
* fix coverity issues
* cleanup obsolete code
* fix formatting of numbers
* fix log rotation
* fix for older systems
* add detection of systemd journal via stderr
* finished on access.log
* remove left-over transport
* do not add empty fields to the logs
* journal get compact uuids; X-Transaction-ID header is added in web responses
* allow compiling on systems without memfd sealing
* added libnetdata/uuid directory
* move datetime formatters to libnetdata
* add missing files
* link the makefiles in libnetdata
* added uuid_parse_flexi() to parse UUIDs with and without hyphens; the web server now read X-Transaction-ID and uses it for functions and web responses
* added stream receiver, sender, proc plugin and pluginsd log stack
* iso8601 advanced usage; line_splitter module in libnetdata; code cleanup
* add message ids to streaming inbound and outbound connections
* cleanup line_splitter between lines to avoid logging garbage; when killing children, kill them with SIGABRT if internal checks is enabled
* send SIGABRT to external plugins only if we are not shutting down
* fix cross cleanup in pluginsd parser
* fatal when there is a stack error in logs
* compile netdata with -fexceptions
* do not kill external plugins with SIGABRT
* metasync info logs to debug level
* added severity to logs
* added json output; added options per log output; added documentation; fixed issues mentioned
* allow memfd only on linux
* moved journal low level functions to journal.c/h
* move health logs to daemon.log with proper priorities
* fixed a couple of bugs; health log in journal
* updated docs
* systemd-cat-native command to push structured logs to journal from the command line
* fix makefiles
* restored NETDATA_LOG_SEVERITY_LEVEL
* fix makefiles
* systemd-cat-native can also work as the logger of Netdata scripts
* do not require a socket to systemd-journal to log-as-netdata
* alarm notify logs in native format
* properly compare log ids
* fatals log alerts; alarm-notify.sh working
* fix overflow warning
* alarm-notify.sh now logs the request (command line)
* anotate external plugins logs with the function cmd they run
* added context, component and type to alarm-notify.sh; shell sanitization removes control character and characters that may be expanded by bash
* reformatted alarm-notify logs
* unify cgroup-network-helper.sh
* added quotes around params
* charts.d.plugin switched logging to journal native
* quotes for logfmt
* unify the status codes of streaming receivers and senders
* alarm-notify: dont log anything, if there is nothing to do
* all external plugins log to stderr when running outside netdata; alarm-notify now shows an error when notifications menthod are needed but are not available
* migrate cgroup-name.sh to new logging
* systemd-cat-native now supports messages with newlines
* socket.c logs use priority
* cleanup log field types
* inherit the systemd set INVOCATION_ID if found
* allow systemd-cat-native to send messages to a systemd-journal-remote URL
* log2journal command that can convert structured logs to journal export format
* various fixes and documentation of log2journal
* updated log2journal docs
* updated log2journal docs
* updated documentation of fields
* allow compiling without libcurl
* do not use socket as format string
* added version information to newly added tools
* updated documentation and help messages
* fix the namespace socket path
* print errno with error
* do not timeout
* updated docs
* updated docs
* updated docs
* log2journal updated docs and params
* when talking to a remote journal, systemd-cat-native batches the messages
* enable lz4 compression for systemd-cat-native when sending messages to a systemd-journal-remote
* Revert "enable lz4 compression for systemd-cat-native when sending messages to a systemd-journal-remote"
This reverts commit b079d53c11
.
* note about uncompressed traffic
* log2journal: code reorg and cleanup to make modular
* finished rewriting log2journal
* more comments
* rewriting rules support
* increased limits
* updated docs
* updated docs
* fix old log call
* use journal only when stderr is connected to journal
* update netdata.spec for libcurl, libpcre2 and log2journal
* pcre2-devel
* do not require pcre2 in centos < 8, amazonlinux < 2023, open suse
* log2journal only on systems pcre2 is available
* ignore log2journal in .gitignore
* avoid log2journal on centos 7, amazonlinux 2 and opensuse
* add pcre2-8 to static build
* undo last commit
* Bundle to static
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
* Add build deps for deb packages
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
* Add dependencies; build from source
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
* Test build for amazon linux and centos expect to fail for suse
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
* fix minor oversight
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
* Reorg code
* Add the install from source (deps) as a TODO
* Not enable the build on suse ecosystem
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
---------
Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
Co-authored-by: Tasos Katsoulas <tasos@netdata.cloud>
2036 lines
81 KiB
C
2036 lines
81 KiB
C
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
#include "replication.h"
|
|
#include "Judy.h"
|
|
|
|
#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
|
|
#define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
|
|
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
|
|
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
|
|
|
|
#define WORKER_JOB_FIND_NEXT 1
|
|
#define WORKER_JOB_QUERYING 2
|
|
#define WORKER_JOB_DELETE_ENTRY 3
|
|
#define WORKER_JOB_FIND_CHART 4
|
|
#define WORKER_JOB_PREPARE_QUERY 5
|
|
#define WORKER_JOB_CHECK_CONSISTENCY 6
|
|
#define WORKER_JOB_BUFFER_COMMIT 7
|
|
#define WORKER_JOB_CLEANUP 8
|
|
#define WORKER_JOB_WAIT 9
|
|
|
|
// master thread worker jobs
|
|
#define WORKER_JOB_STATISTICS 10
|
|
#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
|
|
#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
|
|
#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
|
|
#define WORKER_JOB_CUSTOM_METRIC_ADDED 14
|
|
#define WORKER_JOB_CUSTOM_METRIC_DONE 15
|
|
#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
|
|
#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
|
|
|
|
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
|
|
#define SECONDS_TO_RESET_POINT_IN_TIME 10
|
|
|
|
static struct replication_query_statistics replication_queries = {
|
|
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
|
|
.queries_started = 0,
|
|
.queries_finished = 0,
|
|
.points_read = 0,
|
|
.points_generated = 0,
|
|
};
|
|
|
|
struct replication_query_statistics replication_get_query_statistics(void) {
|
|
spinlock_lock(&replication_queries.spinlock);
|
|
struct replication_query_statistics ret = replication_queries;
|
|
spinlock_unlock(&replication_queries.spinlock);
|
|
return ret;
|
|
}
|
|
|
|
size_t replication_buffers_allocated = 0;
|
|
|
|
size_t replication_allocated_buffers(void) {
|
|
return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// sending replication replies
|
|
|
|
struct replication_dimension {
|
|
STORAGE_POINT sp;
|
|
struct storage_engine_query_handle handle;
|
|
bool enabled;
|
|
bool skip;
|
|
|
|
DICTIONARY *dict;
|
|
const DICTIONARY_ITEM *rda;
|
|
RRDDIM *rd;
|
|
};
|
|
|
|
struct replication_query {
|
|
RRDSET *st;
|
|
|
|
struct {
|
|
time_t first_entry_t;
|
|
time_t last_entry_t;
|
|
} db;
|
|
|
|
struct { // what the parent requested
|
|
time_t after;
|
|
time_t before;
|
|
bool enable_streaming;
|
|
} request;
|
|
|
|
struct { // what the child will do
|
|
time_t after;
|
|
time_t before;
|
|
bool enable_streaming;
|
|
|
|
bool locked_data_collection;
|
|
bool execute;
|
|
bool interrupted;
|
|
STREAM_CAPABILITIES capabilities;
|
|
} query;
|
|
|
|
time_t wall_clock_time;
|
|
|
|
size_t points_read;
|
|
size_t points_generated;
|
|
|
|
STORAGE_ENGINE_BACKEND backend;
|
|
struct replication_request *rq;
|
|
|
|
size_t dimensions;
|
|
struct replication_dimension data[];
|
|
};
|
|
|
|
static struct replication_query *replication_query_prepare(
|
|
RRDSET *st,
|
|
time_t db_first_entry,
|
|
time_t db_last_entry,
|
|
time_t requested_after,
|
|
time_t requested_before,
|
|
bool requested_enable_streaming,
|
|
time_t query_after,
|
|
time_t query_before,
|
|
bool query_enable_streaming,
|
|
time_t wall_clock_time,
|
|
STREAM_CAPABILITIES capabilities
|
|
) {
|
|
size_t dimensions = rrdset_number_of_dimensions(st);
|
|
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
|
|
__atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
|
|
|
|
q->dimensions = dimensions;
|
|
q->st = st;
|
|
|
|
q->db.first_entry_t = db_first_entry;
|
|
q->db.last_entry_t = db_last_entry;
|
|
|
|
q->request.after = requested_after,
|
|
q->request.before = requested_before,
|
|
q->request.enable_streaming = requested_enable_streaming,
|
|
|
|
q->query.after = query_after;
|
|
q->query.before = query_before;
|
|
q->query.enable_streaming = query_enable_streaming;
|
|
q->query.capabilities = capabilities;
|
|
|
|
q->wall_clock_time = wall_clock_time;
|
|
|
|
if (!q->dimensions || !q->query.after || !q->query.before) {
|
|
q->query.execute = false;
|
|
q->dimensions = 0;
|
|
return q;
|
|
}
|
|
|
|
if(q->query.enable_streaming) {
|
|
spinlock_lock(&st->data_collection_lock);
|
|
q->query.locked_data_collection = true;
|
|
|
|
if (st->last_updated.tv_sec > q->query.before) {
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
internal_error(true,
|
|
"STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
|
|
"has start_streaming = true, "
|
|
"adjusting replication before timestamp from %llu to %llu",
|
|
rrdhost_hostname(st->rrdhost), rrdset_id(st),
|
|
(unsigned long long) q->query.before,
|
|
(unsigned long long) st->last_updated.tv_sec
|
|
);
|
|
#endif
|
|
q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
|
|
}
|
|
}
|
|
|
|
q->backend = st->rrdhost->db[0].eng->backend;
|
|
|
|
// prepare our array of dimensions
|
|
size_t count = 0;
|
|
RRDDIM *rd;
|
|
rrddim_foreach_read(rd, st) {
|
|
if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd)))
|
|
continue;
|
|
|
|
if (unlikely(rd_dfe.counter >= q->dimensions)) {
|
|
internal_error(true,
|
|
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
|
|
rrdhost_hostname(st->rrdhost), rrdset_id(st));
|
|
break;
|
|
}
|
|
|
|
struct replication_dimension *d = &q->data[rd_dfe.counter];
|
|
|
|
d->dict = rd_dfe.dict;
|
|
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
|
|
d->rd = rd;
|
|
|
|
storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
|
|
q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
|
|
d->enabled = true;
|
|
d->skip = false;
|
|
count++;
|
|
}
|
|
rrddim_foreach_done(rd);
|
|
|
|
if(!count) {
|
|
// no data for this chart
|
|
|
|
q->query.execute = false;
|
|
|
|
if(q->query.locked_data_collection) {
|
|
spinlock_unlock(&st->data_collection_lock);
|
|
q->query.locked_data_collection = false;
|
|
}
|
|
|
|
}
|
|
else {
|
|
// we have data for this chart
|
|
|
|
q->query.execute = true;
|
|
}
|
|
|
|
return q;
|
|
}
|
|
|
|
static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
|
|
bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false;
|
|
NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
|
|
RRDDIM *rd;
|
|
rrddim_foreach_read(rd, st){
|
|
if (!rrddim_check_upstream_exposed(rd)) continue;
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1);
|
|
|
|
if(with_slots) {
|
|
buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
|
|
}
|
|
|
|
buffer_fast_strcat(wb, " '", 2);
|
|
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
|
|
buffer_fast_strcat(wb, "' ", 2);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
|
|
(usec_t) rd->collector.last_collected_time.tv_usec);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value);
|
|
buffer_fast_strcat(wb, "\n", 1);
|
|
}
|
|
rrddim_foreach_done(rd);
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
|
|
buffer_fast_strcat(wb, "\n", 1);
|
|
}
|
|
|
|
static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
|
|
size_t dimensions = q->dimensions;
|
|
|
|
if(wb && q->query.enable_streaming)
|
|
replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
|
|
|
|
if(q->query.locked_data_collection) {
|
|
spinlock_unlock(&q->st->data_collection_lock);
|
|
q->query.locked_data_collection = false;
|
|
}
|
|
|
|
// release all the dictionary items acquired
|
|
// finalize the queries
|
|
size_t queries = 0;
|
|
|
|
for (size_t i = 0; i < dimensions; i++) {
|
|
struct replication_dimension *d = &q->data[i];
|
|
if (unlikely(!d->enabled)) continue;
|
|
|
|
storage_engine_query_finalize(&d->handle);
|
|
|
|
dictionary_acquired_item_release(d->dict, d->rda);
|
|
|
|
// update global statistics
|
|
queries++;
|
|
}
|
|
|
|
if(executed) {
|
|
spinlock_lock(&replication_queries.spinlock);
|
|
replication_queries.queries_started += queries;
|
|
replication_queries.queries_finished += queries;
|
|
replication_queries.points_read += q->points_read;
|
|
replication_queries.points_generated += q->points_generated;
|
|
|
|
if(q->st && q->st->rrdhost->sender) {
|
|
struct sender_state *s = q->st->rrdhost->sender;
|
|
s->replication.latest_completed_before_t = q->query.before;
|
|
}
|
|
|
|
spinlock_unlock(&replication_queries.spinlock);
|
|
}
|
|
|
|
__atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
|
|
freez(q);
|
|
}
|
|
|
|
static void replication_query_align_to_optimal_before(struct replication_query *q) {
|
|
if(!q->query.execute || q->query.enable_streaming)
|
|
return;
|
|
|
|
size_t dimensions = q->dimensions;
|
|
time_t expanded_before = 0;
|
|
|
|
for (size_t i = 0; i < dimensions; i++) {
|
|
struct replication_dimension *d = &q->data[i];
|
|
if(unlikely(!d->enabled)) continue;
|
|
|
|
time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
|
|
if (!expanded_before || new_before < expanded_before)
|
|
expanded_before = new_before;
|
|
}
|
|
|
|
if(expanded_before > q->query.before && // it is later than the original
|
|
(expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
|
|
expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
|
|
expanded_before < q->wall_clock_time) // it is not later than the wall clock time
|
|
q->query.before = expanded_before;
|
|
}
|
|
|
|
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
|
|
replication_query_align_to_optimal_before(q);
|
|
|
|
bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
|
|
NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
|
|
time_t after = q->query.after;
|
|
time_t before = q->query.before;
|
|
size_t dimensions = q->dimensions;
|
|
time_t wall_clock_time = q->wall_clock_time;
|
|
|
|
bool finished_with_gap = false;
|
|
size_t points_read = 0, points_generated = 0;
|
|
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
time_t actual_after = 0, actual_before = 0;
|
|
#endif
|
|
|
|
time_t now = after + 1;
|
|
time_t last_end_time_in_buffer = 0;
|
|
while(now <= before) {
|
|
time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
|
|
for (size_t i = 0; i < dimensions ;i++) {
|
|
struct replication_dimension *d = &q->data[i];
|
|
if(unlikely(!d->enabled || d->skip)) continue;
|
|
|
|
// fetch the first valid point for the dimension
|
|
int max_skip = 1000;
|
|
while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
|
|
d->sp = storage_engine_query_next_metric(&d->handle);
|
|
points_read++;
|
|
}
|
|
|
|
if(max_skip <= 0) {
|
|
d->skip = true;
|
|
|
|
nd_log_limit_static_global_var(erl, 1, 0);
|
|
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
|
|
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
|
|
"beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
|
|
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
|
|
(unsigned long long) now);
|
|
|
|
continue;
|
|
}
|
|
|
|
if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
|
|
// this dimension does not provide any data
|
|
continue;
|
|
|
|
time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
|
|
if(unlikely(!update_every))
|
|
update_every = q->st->update_every;
|
|
|
|
if(unlikely(!min_update_every))
|
|
min_update_every = update_every;
|
|
|
|
if(unlikely(!min_start_time))
|
|
min_start_time = d->sp.start_time_s;
|
|
|
|
if(unlikely(!min_end_time))
|
|
min_end_time = d->sp.end_time_s;
|
|
|
|
min_update_every = MIN(min_update_every, update_every);
|
|
max_update_every = MAX(max_update_every, update_every);
|
|
|
|
min_start_time = MIN(min_start_time, d->sp.start_time_s);
|
|
max_start_time = MAX(max_start_time, d->sp.start_time_s);
|
|
|
|
min_end_time = MIN(min_end_time, d->sp.end_time_s);
|
|
max_end_time = MAX(max_end_time, d->sp.end_time_s);
|
|
}
|
|
|
|
if (unlikely(min_update_every != max_update_every ||
|
|
min_start_time != max_start_time)) {
|
|
|
|
time_t fix_min_start_time;
|
|
if(last_end_time_in_buffer &&
|
|
last_end_time_in_buffer >= min_start_time &&
|
|
last_end_time_in_buffer <= max_start_time) {
|
|
fix_min_start_time = last_end_time_in_buffer;
|
|
}
|
|
else
|
|
fix_min_start_time = min_end_time - min_update_every;
|
|
|
|
#ifdef NETDATA_INTERNAL_CHECKS
|
|
nd_log_limit_static_global_var(erl, 1, 0);
|
|
nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
|
|
"REPLAY WARNING: 'host:%s/chart:%s' "
|
|
"misaligned dimensions, "
|
|
"update every (min: %ld, max: %ld), "
|
|
"start time (min: %ld, max: %ld), "
|
|
"end time (min %ld, max %ld), "
|
|
"now %ld, last end time sent %ld, "
|
|
"min start time is fixed to %ld",
|
|
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
|
|
min_update_every, max_update_every,
|
|
min_start_time, max_start_time,
|
|
min_end_time, max_end_time,
|
|
now, last_end_time_in_buffer,
|
|
fix_min_start_time
|
|
);
|
|
#endif
|
|
|
|
min_start_time = fix_min_start_time;
|
|
}
|
|
|
|
if(likely(min_start_time <= now && min_end_time >= now)) {
|
|
// we have a valid point
|
|
|
|
if (unlikely(min_end_time == min_start_time))
|
|
min_start_time = min_end_time - q->st->update_every;
|
|
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
if (unlikely(!actual_after))
|
|
actual_after = min_end_time;
|
|
|
|
actual_before = min_end_time;
|
|
#endif
|
|
|
|
if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
|
|
q->query.before = last_end_time_in_buffer;
|
|
q->query.enable_streaming = false;
|
|
|
|
internal_error(true, "REPLICATION: current buffer size %zu is more than the "
|
|
"max message size %zu for chart '%s' of host '%s'. "
|
|
"Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
|
|
buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
|
|
q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
|
|
q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
|
|
|
|
q->query.interrupted = true;
|
|
|
|
break;
|
|
}
|
|
last_end_time_in_buffer = min_end_time;
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
|
|
|
|
if(with_slots) {
|
|
buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
|
|
}
|
|
|
|
buffer_fast_strcat(wb, " '' ", 4);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, min_start_time);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, min_end_time);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
|
|
buffer_fast_strcat(wb, "\n", 1);
|
|
|
|
// output the replay values for this time
|
|
for (size_t i = 0; i < dimensions; i++) {
|
|
struct replication_dimension *d = &q->data[i];
|
|
if (unlikely(!d->enabled)) continue;
|
|
|
|
if (likely( d->sp.start_time_s <= min_end_time &&
|
|
d->sp.end_time_s >= min_end_time &&
|
|
!storage_point_is_unset(d->sp) &&
|
|
!storage_point_is_gap(d->sp))) {
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1);
|
|
|
|
if(with_slots) {
|
|
buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot);
|
|
}
|
|
|
|
buffer_fast_strcat(wb, " \"", 2);
|
|
buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
|
|
buffer_fast_strcat(wb, "\" ", 2);
|
|
buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
|
|
buffer_fast_strcat(wb, "\n", 1);
|
|
|
|
points_generated++;
|
|
}
|
|
}
|
|
|
|
now = min_end_time + 1;
|
|
}
|
|
else if(unlikely(min_end_time < now))
|
|
// the query does not progress
|
|
break;
|
|
else {
|
|
// we have gap - all points are in the future
|
|
now = min_start_time;
|
|
|
|
if(min_start_time > before && !points_generated) {
|
|
before = q->query.before = min_start_time - 1;
|
|
finished_with_gap = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
if(actual_after) {
|
|
char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
|
|
log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
|
|
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
|
|
internal_error(true,
|
|
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
|
|
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
|
|
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
|
|
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
|
|
}
|
|
else
|
|
internal_error(true,
|
|
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
|
|
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
|
|
(unsigned long long)after, (unsigned long long)before);
|
|
#endif // NETDATA_LOG_REPLICATION_REQUESTS
|
|
|
|
q->points_read += points_read;
|
|
q->points_generated += points_generated;
|
|
|
|
if(last_end_time_in_buffer < before - q->st->update_every)
|
|
finished_with_gap = true;
|
|
|
|
return finished_with_gap;
|
|
}
|
|
|
|
static struct replication_query *replication_response_prepare(
|
|
RRDSET *st,
|
|
bool requested_enable_streaming,
|
|
time_t requested_after,
|
|
time_t requested_before,
|
|
STREAM_CAPABILITIES capabilities
|
|
) {
|
|
time_t wall_clock_time = now_realtime_sec();
|
|
|
|
if(requested_after > requested_before) {
|
|
// flip them
|
|
time_t t = requested_before;
|
|
requested_before = requested_after;
|
|
requested_after = t;
|
|
}
|
|
|
|
if(requested_after > wall_clock_time) {
|
|
requested_after = 0;
|
|
requested_before = 0;
|
|
requested_enable_streaming = true;
|
|
}
|
|
|
|
if(requested_before > wall_clock_time) {
|
|
requested_before = wall_clock_time;
|
|
requested_enable_streaming = true;
|
|
}
|
|
|
|
time_t query_after = requested_after;
|
|
time_t query_before = requested_before;
|
|
bool query_enable_streaming = requested_enable_streaming;
|
|
|
|
time_t db_first_entry = 0, db_last_entry = 0;
|
|
rrdset_get_retention_of_tier_for_collected_chart(
|
|
st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
|
|
|
|
if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
|
|
// no data requested - just enable streaming
|
|
;
|
|
}
|
|
else {
|
|
if (query_after < db_first_entry)
|
|
query_after = db_first_entry;
|
|
|
|
if (query_before > db_last_entry)
|
|
query_before = db_last_entry;
|
|
|
|
// if the parent asked us to start streaming, then fill the rest with the data that we have
|
|
if (requested_enable_streaming)
|
|
query_before = db_last_entry;
|
|
|
|
if (query_after > query_before) {
|
|
time_t tmp = query_before;
|
|
query_before = query_after;
|
|
query_after = tmp;
|
|
}
|
|
|
|
query_enable_streaming = (requested_enable_streaming ||
|
|
query_before == db_last_entry ||
|
|
!requested_after ||
|
|
!requested_before) ? true : false;
|
|
}
|
|
|
|
return replication_query_prepare(
|
|
st,
|
|
db_first_entry, db_last_entry,
|
|
requested_after, requested_before, requested_enable_streaming,
|
|
query_after, query_before, query_enable_streaming,
|
|
wall_clock_time, capabilities);
|
|
}
|
|
|
|
void replication_response_cancel_and_finalize(struct replication_query *q) {
|
|
replication_query_finalize(NULL, q, false);
|
|
}
|
|
|
|
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
|
|
|
|
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
|
|
bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
|
|
NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
|
|
struct replication_request *rq = q->rq;
|
|
RRDSET *st = q->st;
|
|
RRDHOST *host = st->rrdhost;
|
|
|
|
// we might want to optimize this by filling a temporary buffer
|
|
// and copying the result to the host's buffer in order to avoid
|
|
// holding the host's buffer lock for too long
|
|
BUFFER *wb = sender_start(host->sender);
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
|
|
|
|
if(with_slots) {
|
|
buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
|
|
}
|
|
|
|
buffer_fast_strcat(wb, " '", 2);
|
|
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
|
|
buffer_fast_strcat(wb, "'\n", 2);
|
|
|
|
bool locked_data_collection = q->query.locked_data_collection;
|
|
q->query.locked_data_collection = false;
|
|
|
|
bool finished_with_gap = false;
|
|
if(q->query.execute)
|
|
finished_with_gap = replication_query_execute(wb, q, max_msg_size);
|
|
|
|
time_t after = q->request.after;
|
|
time_t before = q->query.before;
|
|
bool enable_streaming = q->query.enable_streaming;
|
|
|
|
replication_query_finalize(wb, q, q->query.execute);
|
|
q = NULL; // IMPORTANT: q is invalid now
|
|
|
|
// get a fresh retention to send to the parent
|
|
time_t wall_clock_time = now_realtime_sec();
|
|
time_t db_first_entry, db_last_entry;
|
|
rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
|
|
|
|
// end with first/last entries we have, and the first start time and
|
|
// last end time of the data we sent
|
|
|
|
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
|
|
buffer_print_int64_encoded(wb, integer_encoding, st->update_every);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry);
|
|
|
|
buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
|
|
|
|
buffer_print_uint64_encoded(wb, integer_encoding, after);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, before);
|
|
buffer_fast_strcat(wb, " ", 1);
|
|
buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
|
|
buffer_fast_strcat(wb, "\n", 1);
|
|
|
|
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
|
|
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
|
|
worker_is_busy(WORKER_JOB_CLEANUP);
|
|
|
|
if(enable_streaming) {
|
|
if(sender_is_still_connected_for_this_request(rq)) {
|
|
// enable normal streaming if we have to
|
|
// but only if the sender buffer has not been flushed since we started
|
|
|
|
if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
|
|
rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
|
|
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
|
|
rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
|
|
|
|
if(!finished_with_gap)
|
|
st->rrdpush.sender.resync_time_s = 0;
|
|
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
|
|
rrdhost_hostname(st->rrdhost), rrdset_id(st));
|
|
#endif
|
|
}
|
|
else
|
|
internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
|
|
rrdhost_hostname(st->rrdhost), rrdset_id(st));
|
|
}
|
|
}
|
|
|
|
if(locked_data_collection)
|
|
spinlock_unlock(&st->data_collection_lock);
|
|
|
|
return enable_streaming;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// sending replication requests
|
|
|
|
struct replication_request_details {
|
|
struct {
|
|
send_command callback;
|
|
void *data;
|
|
} caller;
|
|
|
|
RRDHOST *host;
|
|
RRDSET *st;
|
|
|
|
struct {
|
|
time_t first_entry_t; // the first entry time the child has
|
|
time_t last_entry_t; // the last entry time the child has
|
|
time_t wall_clock_time; // the current time of the child
|
|
bool fixed_last_entry; // when set we set the last entry to wall clock time
|
|
} child_db;
|
|
|
|
struct {
|
|
time_t first_entry_t; // the first entry time we have
|
|
time_t last_entry_t; // the last entry time we have
|
|
time_t wall_clock_time; // the current local world clock time
|
|
} local_db;
|
|
|
|
struct {
|
|
time_t from; // the starting time of the entire gap we have
|
|
time_t to; // the ending time of the entire gap we have
|
|
} gap;
|
|
|
|
struct {
|
|
time_t after; // the start time we requested previously from this child
|
|
time_t before; // the end time we requested previously from this child
|
|
} last_request;
|
|
|
|
struct {
|
|
time_t after; // the start time of this replication request - the child will add 1 second
|
|
time_t before; // the end time of this replication request
|
|
bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
|
|
} wanted;
|
|
};
|
|
|
|
static void replicate_log_request(struct replication_request_details *r, const char *msg) {
|
|
#ifdef NETDATA_INTERNAL_CHECKS
|
|
internal_error(true,
|
|
#else
|
|
nd_log_limit_static_global_var(erl, 1, 0);
|
|
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
|
|
#endif
|
|
"REPLAY ERROR: 'host:%s/chart:%s' child sent: "
|
|
"db from %ld to %ld%s, wall clock time %ld, "
|
|
"last request from %ld to %ld, "
|
|
"issue: %s - "
|
|
"sending replication request from %ld to %ld, start streaming %s",
|
|
rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
|
|
r->child_db.first_entry_t,
|
|
r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
|
|
r->child_db.wall_clock_time,
|
|
r->last_request.after,
|
|
r->last_request.before,
|
|
msg,
|
|
r->wanted.after,
|
|
r->wanted.before,
|
|
r->wanted.start_streaming ? "true" : "false");
|
|
}
|
|
|
|
static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
|
|
RRDSET *st = r->st;
|
|
|
|
if(log)
|
|
replicate_log_request(r, msg);
|
|
|
|
if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
|
|
st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
|
|
|
|
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
|
|
st->replay.log_next_data_collection = true;
|
|
|
|
char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
|
|
|
|
if(r->wanted.after)
|
|
log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
|
|
|
|
if(r->wanted.before)
|
|
log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
|
|
|
|
internal_error(true,
|
|
"REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
|
|
"last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
|
|
, rrdhost_hostname(r->host), rrdset_id(r->st)
|
|
, r->wanted.after, wanted_after_buf
|
|
, r->wanted.before, wanted_before_buf
|
|
, r->wanted.start_streaming ? "YES" : "NO"
|
|
, msg
|
|
, r->last_request.after, r->last_request.before
|
|
, r->child_db.first_entry_t, r->child_db.last_entry_t
|
|
, r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
|
|
, r->local_db.first_entry_t, r->local_db.last_entry_t
|
|
, r->local_db.wall_clock_time
|
|
, r->gap.from, r->gap.to
|
|
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
|
|
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
|
|
);
|
|
|
|
st->replay.start_streaming = r->wanted.start_streaming;
|
|
st->replay.after = r->wanted.after;
|
|
st->replay.before = r->wanted.before;
|
|
#endif // NETDATA_LOG_REPLICATION_REQUESTS
|
|
|
|
char buffer[2048 + 1];
|
|
snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
|
|
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
|
|
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
|
|
|
|
ssize_t ret = r->caller.callback(buffer, r->caller.data);
|
|
if (ret < 0) {
|
|
netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
|
|
rrdhost_hostname(r->host), rrdset_id(r->st), ret);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
|
|
time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
|
|
time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
|
|
{
|
|
struct replication_request_details r = {
|
|
.caller = {
|
|
.callback = callback,
|
|
.data = callback_data,
|
|
},
|
|
|
|
.host = host,
|
|
.st = st,
|
|
|
|
.child_db = {
|
|
.first_entry_t = child_first_entry,
|
|
.last_entry_t = child_last_entry,
|
|
.wall_clock_time = child_wall_clock_time,
|
|
.fixed_last_entry = false,
|
|
},
|
|
|
|
.local_db = {
|
|
.first_entry_t = 0,
|
|
.last_entry_t = 0,
|
|
.wall_clock_time = now_realtime_sec(),
|
|
},
|
|
|
|
.last_request = {
|
|
.after = prev_first_entry_wanted,
|
|
.before = prev_last_entry_wanted,
|
|
},
|
|
|
|
.wanted = {
|
|
.after = 0,
|
|
.before = 0,
|
|
.start_streaming = true,
|
|
},
|
|
};
|
|
|
|
if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
|
|
replicate_log_request(&r, "child's db last entry > child's wall clock time");
|
|
r.child_db.last_entry_t = r.child_db.wall_clock_time;
|
|
r.child_db.fixed_last_entry = true;
|
|
}
|
|
|
|
rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
|
|
|
|
// let's find the GAP we have
|
|
if(!r.last_request.after || !r.last_request.before) {
|
|
// there is no previous request
|
|
|
|
if(r.local_db.last_entry_t)
|
|
// we have some data, let's continue from the last point we have
|
|
r.gap.from = r.local_db.last_entry_t;
|
|
else
|
|
// we don't have any data, the gap is the max timeframe we are allowed to replicate
|
|
r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
|
|
|
|
}
|
|
else {
|
|
// we had sent a request - let's continue at the point we left it
|
|
// for this we don't take into account the actual data in our db
|
|
// because the child may also have gaps, and we need to get over it
|
|
r.gap.from = r.last_request.before;
|
|
}
|
|
|
|
// we want all the data up to now
|
|
r.gap.to = r.local_db.wall_clock_time;
|
|
|
|
// The gap is now r.gap.from -> r.gap.to
|
|
|
|
if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
|
|
return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
|
|
|
|
if (unlikely(!rrdset_number_of_dimensions(st)))
|
|
return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
|
|
|
|
if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
|
|
return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
|
|
|
|
if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
|
|
return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
|
|
|
|
if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
|
|
return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
|
|
|
|
if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
|
|
return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
|
|
|
|
if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
|
|
return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
|
|
|
|
// let's find what the child can provide to fill that gap
|
|
|
|
if(r.child_db.first_entry_t > r.gap.from)
|
|
// the child does not have all the data - let's get what it has
|
|
r.wanted.after = r.child_db.first_entry_t;
|
|
else
|
|
// ok, the child can fill the entire gap we have
|
|
r.wanted.after = r.gap.from;
|
|
|
|
if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
|
|
// the duration is too big for one request - let's take the first step
|
|
r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
|
|
else
|
|
// wow, we can do it in one request
|
|
r.wanted.before = r.gap.to;
|
|
|
|
// don't ask from the child more than it has
|
|
if(r.wanted.before > r.child_db.last_entry_t)
|
|
r.wanted.before = r.child_db.last_entry_t;
|
|
|
|
if(r.wanted.after > r.wanted.before) {
|
|
r.wanted.after = 0;
|
|
r.wanted.before = 0;
|
|
r.wanted.start_streaming = true;
|
|
return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
|
|
}
|
|
|
|
// the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
|
|
r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
|
|
r.wanted.before >= r.child_db.last_entry_t ||
|
|
r.wanted.before >= r.child_db.wall_clock_time ||
|
|
r.wanted.before >= r.local_db.wall_clock_time);
|
|
|
|
// the wanted timeframe is now r.wanted.after -> r.wanted.before
|
|
// send it
|
|
return send_replay_chart_cmd(&r, "OK", false);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// replication thread
|
|
|
|
// replication request in sender DICTIONARY
|
|
// used for de-duplicating the requests
|
|
struct replication_request {
|
|
struct sender_state *sender; // the sender we should put the reply at
|
|
STRING *chart_id; // the chart of the request
|
|
time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
|
|
time_t before; // the end time of the query (maybe zero)
|
|
|
|
usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
|
|
Word_t unique_id; // auto-increment, later requests have bigger
|
|
|
|
bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
|
|
bool indexed_in_judy; // true when the request is indexed in judy
|
|
bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
|
|
bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
|
|
|
|
// prepare ahead members - preprocessing
|
|
bool found; // used as a result boolean for the find call
|
|
bool executed; // used to detect if we have skipped requests while preprocessing
|
|
RRDSET *st; // caching of the chart during preprocessing
|
|
struct replication_query *q; // the preprocessing query initialization
|
|
};
|
|
|
|
// replication sort entry in JudyL array
|
|
// used for sorting all requests, across all nodes
|
|
struct replication_sort_entry {
|
|
struct replication_request *rq;
|
|
|
|
size_t unique_id; // used as a key to identify the sort entry - we never access its contents
|
|
};
|
|
|
|
#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
|
|
|
|
// the global variables for the replication thread
|
|
static struct replication_thread {
|
|
ARAL *aral_rse;
|
|
|
|
SPINLOCK spinlock;
|
|
|
|
struct {
|
|
size_t pending; // number of requests pending in the queue
|
|
|
|
// statistics
|
|
size_t added; // number of requests added to the queue
|
|
size_t removed; // number of requests removed from the queue
|
|
size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
|
|
size_t senders_full; // number of times a sender reset our last position in the queue
|
|
size_t sender_resets; // number of times a sender reset our last position in the queue
|
|
time_t first_time_t; // the minimum 'after' we encountered
|
|
|
|
struct {
|
|
Word_t after;
|
|
Word_t unique_id;
|
|
Pvoid_t JudyL_array;
|
|
} queue;
|
|
|
|
} unsafe; // protected from replication_recursive_lock()
|
|
|
|
struct {
|
|
Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
|
|
size_t executed; // the number of replication requests executed
|
|
size_t latest_first_time; // the 'after' timestamp of the last request we executed
|
|
size_t memory; // the total memory allocated by replication
|
|
} atomic; // access should be with atomic operations
|
|
|
|
struct {
|
|
size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
|
|
|
|
netdata_thread_t **threads_ptrs;
|
|
size_t threads;
|
|
} main_thread; // access is allowed only by the main thread
|
|
|
|
} replication_globals = {
|
|
.aral_rse = NULL,
|
|
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
|
|
.unsafe = {
|
|
.pending = 0,
|
|
|
|
.added = 0,
|
|
.removed = 0,
|
|
.pending_no_room = 0,
|
|
.sender_resets = 0,
|
|
.senders_full = 0,
|
|
|
|
.first_time_t = 0,
|
|
|
|
.queue = {
|
|
.after = 0,
|
|
.unique_id = 0,
|
|
.JudyL_array = NULL,
|
|
},
|
|
},
|
|
.atomic = {
|
|
.unique_id = 0,
|
|
.executed = 0,
|
|
.latest_first_time = 0,
|
|
.memory = 0,
|
|
},
|
|
.main_thread = {
|
|
.last_executed = 0,
|
|
.threads = 0,
|
|
.threads_ptrs = NULL,
|
|
},
|
|
};
|
|
|
|
size_t replication_allocated_memory(void) {
|
|
return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
|
|
#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
|
|
|
|
static inline bool replication_recursive_lock_mode(char mode) {
|
|
static __thread int recursions = 0;
|
|
|
|
if(mode == 'L') { // (L)ock
|
|
if(++recursions == 1)
|
|
spinlock_lock(&replication_globals.spinlock);
|
|
}
|
|
else if(mode == 'U') { // (U)nlock
|
|
if(--recursions == 0)
|
|
spinlock_unlock(&replication_globals.spinlock);
|
|
}
|
|
else if(mode == 'C') { // (C)heck
|
|
if(recursions > 0)
|
|
return true;
|
|
else
|
|
return false;
|
|
}
|
|
else
|
|
fatal("REPLICATION: unknown lock mode '%c'", mode);
|
|
|
|
#ifdef NETDATA_INTERNAL_CHECKS
|
|
if(recursions < 0)
|
|
fatal("REPLICATION: recursions is %d", recursions);
|
|
#endif
|
|
|
|
return true;
|
|
}
|
|
|
|
#define replication_recursive_lock() replication_recursive_lock_mode('L')
|
|
#define replication_recursive_unlock() replication_recursive_lock_mode('U')
|
|
#define fatal_when_replication_is_not_locked_for_me() do { \
|
|
if(!replication_recursive_lock_mode('C')) \
|
|
fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
|
|
} while(0)
|
|
|
|
void replication_set_next_point_in_time(time_t after, size_t unique_id) {
|
|
replication_recursive_lock();
|
|
replication_globals.unsafe.queue.after = after;
|
|
replication_globals.unsafe.queue.unique_id = unique_id;
|
|
replication_recursive_unlock();
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// replication sort entry management
|
|
|
|
static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
|
|
struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
|
|
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
|
|
|
|
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
|
|
|
|
// copy the request
|
|
rse->rq = rq;
|
|
rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
|
|
|
|
// save the unique id into the request, to be able to delete it later
|
|
rq->unique_id = rse->unique_id;
|
|
rq->indexed_in_judy = false;
|
|
rq->not_indexed_buffer_full = false;
|
|
rq->not_indexed_preprocessing = false;
|
|
return rse;
|
|
}
|
|
|
|
static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
|
|
aral_freez(replication_globals.aral_rse, rse);
|
|
__atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
|
|
}
|
|
|
|
static void replication_sort_entry_add(struct replication_request *rq) {
|
|
if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
|
|
rq->indexed_in_judy = false;
|
|
rq->not_indexed_buffer_full = true;
|
|
rq->not_indexed_preprocessing = false;
|
|
replication_recursive_lock();
|
|
replication_globals.unsafe.pending_no_room++;
|
|
replication_recursive_unlock();
|
|
return;
|
|
}
|
|
|
|
// cache this, because it will be changed
|
|
bool decrement_no_room = rq->not_indexed_buffer_full;
|
|
|
|
struct replication_sort_entry *rse = replication_sort_entry_create(rq);
|
|
|
|
replication_recursive_lock();
|
|
|
|
if(decrement_no_room)
|
|
replication_globals.unsafe.pending_no_room--;
|
|
|
|
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
|
|
// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
|
|
// !replication_globals.protected.skipped_no_room_since_last_reset) {
|
|
//
|
|
// // make it find this request first
|
|
// replication_set_next_point_in_time(rq->after, rq->unique_id);
|
|
// }
|
|
|
|
replication_globals.unsafe.added++;
|
|
replication_globals.unsafe.pending++;
|
|
|
|
Pvoid_t *inner_judy_ptr;
|
|
|
|
// find the outer judy entry, using after as key
|
|
size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
|
|
inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
|
|
size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
|
|
if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
|
|
fatal("REPLICATION: corrupted outer judyL");
|
|
|
|
// add it to the inner judy, using unique_id as key
|
|
size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
|
|
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
|
|
size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
|
|
if(unlikely(!item || item == PJERR))
|
|
fatal("REPLICATION: corrupted inner judyL");
|
|
|
|
*item = rse;
|
|
rq->indexed_in_judy = true;
|
|
rq->not_indexed_buffer_full = false;
|
|
rq->not_indexed_preprocessing = false;
|
|
|
|
if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
|
|
replication_globals.unsafe.first_time_t = rq->after;
|
|
|
|
replication_recursive_unlock();
|
|
|
|
__atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
|
|
}
|
|
|
|
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
|
|
fatal_when_replication_is_not_locked_for_me();
|
|
|
|
bool inner_judy_deleted = false;
|
|
|
|
replication_globals.unsafe.removed++;
|
|
replication_globals.unsafe.pending--;
|
|
|
|
rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
|
|
|
|
rse->rq->indexed_in_judy = false;
|
|
rse->rq->not_indexed_preprocessing = preprocessing;
|
|
|
|
size_t memory_saved = 0;
|
|
|
|
// delete it from the inner judy
|
|
size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
|
|
JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
|
|
size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
|
|
memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
|
|
|
|
// if no items left, delete it from the outer judy
|
|
if(**inner_judy_ppptr == NULL) {
|
|
size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
|
|
JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
|
|
size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
|
|
memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
|
|
inner_judy_deleted = true;
|
|
}
|
|
|
|
// free memory
|
|
replication_sort_entry_destroy(rse);
|
|
|
|
__atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
|
|
|
|
return inner_judy_deleted;
|
|
}
|
|
|
|
static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
|
|
Pvoid_t *inner_judy_pptr;
|
|
struct replication_sort_entry *rse_to_delete = NULL;
|
|
|
|
replication_recursive_lock();
|
|
if(rq->indexed_in_judy) {
|
|
|
|
inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
|
|
if (inner_judy_pptr) {
|
|
Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
|
|
if (our_item_pptr) {
|
|
rse_to_delete = *our_item_pptr;
|
|
replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
|
|
|
|
if(buffer_full) {
|
|
replication_globals.unsafe.pending_no_room++;
|
|
rq->not_indexed_buffer_full = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!rse_to_delete)
|
|
fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
|
|
rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
|
|
|
|
}
|
|
|
|
replication_recursive_unlock();
|
|
}
|
|
|
|
static struct replication_request replication_request_get_first_available() {
|
|
Pvoid_t *inner_judy_pptr;
|
|
|
|
replication_recursive_lock();
|
|
|
|
struct replication_request rq_to_return = (struct replication_request){ .found = false };
|
|
|
|
if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
|
|
replication_globals.unsafe.queue.after = 0;
|
|
replication_globals.unsafe.queue.unique_id = 0;
|
|
}
|
|
|
|
Word_t started_after = replication_globals.unsafe.queue.after;
|
|
|
|
size_t round = 0;
|
|
while(!rq_to_return.found) {
|
|
round++;
|
|
|
|
if(round > 2)
|
|
break;
|
|
|
|
if(round == 2) {
|
|
if(started_after == 0)
|
|
break;
|
|
|
|
replication_globals.unsafe.queue.after = 0;
|
|
replication_globals.unsafe.queue.unique_id = 0;
|
|
}
|
|
|
|
bool find_same_after = true;
|
|
while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
|
|
Pvoid_t *our_item_pptr;
|
|
|
|
if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
|
|
break;
|
|
|
|
while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
|
|
struct replication_sort_entry *rse = *our_item_pptr;
|
|
struct replication_request *rq = rse->rq;
|
|
|
|
// copy the request to return it
|
|
rq_to_return = *rq;
|
|
rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
|
|
|
|
// set the return result to found
|
|
rq_to_return.found = true;
|
|
|
|
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
|
|
// we removed the item from the outer JudyL
|
|
break;
|
|
}
|
|
|
|
// prepare for the next iteration on the outer loop
|
|
replication_globals.unsafe.queue.unique_id = 0;
|
|
}
|
|
}
|
|
|
|
replication_recursive_unlock();
|
|
return rq_to_return;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// replication request management
|
|
|
|
static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
|
|
struct sender_state *s = sender_state; (void)s;
|
|
struct replication_request *rq = value;
|
|
|
|
// IMPORTANT:
|
|
// We use the react instead of the insert callback
|
|
// because we want the item to be atomically visible
|
|
// to our replication thread, immediately after.
|
|
|
|
// If we put this at the insert callback, the item is not guaranteed
|
|
// to be atomically visible to others, so the replication thread
|
|
// may see the replication sort entry, but fail to find the dictionary item
|
|
// related to it.
|
|
|
|
replication_sort_entry_add(rq);
|
|
|
|
// this request is about a unique chart for this sender
|
|
rrdpush_sender_replicating_charts_plus_one(s);
|
|
}
|
|
|
|
static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
|
|
struct sender_state *s = sender_state; (void)s;
|
|
struct replication_request *rq = old_value; (void)rq;
|
|
struct replication_request *rq_new = new_value;
|
|
|
|
replication_recursive_lock();
|
|
|
|
if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
|
|
// we can replace this command
|
|
internal_error(
|
|
true,
|
|
"STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
|
|
rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
|
|
(unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
|
|
(unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
|
|
|
|
rq->after = rq_new->after;
|
|
rq->before = rq_new->before;
|
|
rq->start_streaming = rq_new->start_streaming;
|
|
}
|
|
else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
|
|
replication_sort_entry_add(rq);
|
|
internal_error(
|
|
true,
|
|
"STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
|
|
rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
|
|
(unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
|
|
(unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
|
|
}
|
|
else {
|
|
internal_error(
|
|
true,
|
|
"STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
|
|
rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
|
|
dictionary_acquired_item_name(item),
|
|
(unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
|
|
(unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
|
|
}
|
|
|
|
replication_recursive_unlock();
|
|
|
|
string_freez(rq_new->chart_id);
|
|
return false;
|
|
}
|
|
|
|
static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
|
|
struct replication_request *rq = value;
|
|
|
|
// this request is about a unique chart for this sender
|
|
rrdpush_sender_replicating_charts_minus_one(rq->sender);
|
|
|
|
if(rq->indexed_in_judy)
|
|
replication_sort_entry_del(rq, false);
|
|
|
|
else if(rq->not_indexed_buffer_full) {
|
|
replication_recursive_lock();
|
|
replication_globals.unsafe.pending_no_room--;
|
|
replication_recursive_unlock();
|
|
}
|
|
|
|
string_freez(rq->chart_id);
|
|
}
|
|
|
|
static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
|
|
return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
|
|
};
|
|
|
|
static bool replication_execute_request(struct replication_request *rq, bool workers) {
|
|
bool ret = false;
|
|
|
|
if(!rq->st) {
|
|
if(likely(workers))
|
|
worker_is_busy(WORKER_JOB_FIND_CHART);
|
|
|
|
rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
|
|
}
|
|
|
|
if(!rq->st) {
|
|
internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
|
|
rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
|
|
|
|
goto cleanup;
|
|
}
|
|
|
|
netdata_thread_disable_cancelability();
|
|
|
|
if(!rq->q) {
|
|
if(likely(workers))
|
|
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
|
|
|
|
rq->q = replication_response_prepare(
|
|
rq->st,
|
|
rq->start_streaming,
|
|
rq->after,
|
|
rq->before,
|
|
rq->sender->capabilities);
|
|
}
|
|
|
|
if(likely(workers))
|
|
worker_is_busy(WORKER_JOB_QUERYING);
|
|
|
|
// send the replication data
|
|
rq->q->rq = rq;
|
|
replication_response_execute_and_finalize(
|
|
rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
|
|
|
|
rq->q = NULL;
|
|
netdata_thread_enable_cancelability();
|
|
|
|
__atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
|
|
|
|
ret = true;
|
|
|
|
cleanup:
|
|
if(rq->q) {
|
|
replication_response_cancel_and_finalize(rq->q);
|
|
rq->q = NULL;
|
|
}
|
|
|
|
string_freez(rq->chart_id);
|
|
worker_is_idle();
|
|
return ret;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// public API
|
|
|
|
void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
|
|
struct replication_request rq = {
|
|
.sender = sender,
|
|
.chart_id = string_strdupz(chart_id),
|
|
.after = after,
|
|
.before = before,
|
|
.start_streaming = start_streaming,
|
|
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
|
|
.indexed_in_judy = false,
|
|
.not_indexed_buffer_full = false,
|
|
.not_indexed_preprocessing = false,
|
|
};
|
|
|
|
if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t)
|
|
sender->replication.oldest_request_after_t = rq.after;
|
|
|
|
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
|
|
replication_execute_request(&rq, false);
|
|
|
|
else
|
|
dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
|
|
}
|
|
|
|
void replication_sender_delete_pending_requests(struct sender_state *sender) {
|
|
// allow the dictionary destructor to go faster on locks
|
|
dictionary_flush(sender->replication.requests);
|
|
}
|
|
|
|
void replication_init_sender(struct sender_state *sender) {
|
|
sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
|
|
NULL, sizeof(struct replication_request));
|
|
|
|
dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
|
|
dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
|
|
dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
|
|
}
|
|
|
|
void replication_cleanup_sender(struct sender_state *sender) {
|
|
// allow the dictionary destructor to go faster on locks
|
|
replication_recursive_lock();
|
|
dictionary_destroy(sender->replication.requests);
|
|
replication_recursive_unlock();
|
|
}
|
|
|
|
void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
|
|
size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
|
|
size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
|
|
|
|
if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
|
|
rrdpush_sender_replication_buffer_full_set(s, true);
|
|
|
|
struct replication_request *rq;
|
|
dfe_start_read(s->replication.requests, rq) {
|
|
if(rq->indexed_in_judy)
|
|
replication_sort_entry_del(rq, true);
|
|
}
|
|
dfe_done(rq);
|
|
|
|
replication_recursive_lock();
|
|
replication_globals.unsafe.senders_full++;
|
|
replication_recursive_unlock();
|
|
}
|
|
else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
|
|
rrdpush_sender_replication_buffer_full_set(s, false);
|
|
|
|
struct replication_request *rq;
|
|
dfe_start_read(s->replication.requests, rq) {
|
|
if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
|
|
replication_sort_entry_add(rq);
|
|
}
|
|
dfe_done(rq);
|
|
|
|
replication_recursive_lock();
|
|
replication_globals.unsafe.senders_full--;
|
|
replication_globals.unsafe.sender_resets++;
|
|
// replication_set_next_point_in_time(0, 0);
|
|
replication_recursive_unlock();
|
|
}
|
|
|
|
rrdpush_sender_set_buffer_used_percent(s, percentage);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// replication thread
|
|
|
|
static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
|
|
internal_error(
|
|
host->sender &&
|
|
!rrdpush_sender_pending_replication_requests(host->sender) &&
|
|
dictionary_entries(host->sender->replication.requests) != 0,
|
|
"REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
|
|
rrdhost_hostname(host),
|
|
rrdpush_sender_pending_replication_requests(host->sender),
|
|
dictionary_entries(host->sender->replication.requests)
|
|
);
|
|
|
|
size_t ok = 0;
|
|
size_t errors = 0;
|
|
|
|
RRDSET *st;
|
|
rrdset_foreach_read(st, host) {
|
|
RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
|
|
|
|
bool is_error = false;
|
|
|
|
if(!flags) {
|
|
internal_error(
|
|
true,
|
|
"REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
|
|
rrdhost_hostname(host), rrdset_id(st)
|
|
);
|
|
is_error = true;
|
|
}
|
|
|
|
if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
|
|
internal_error(
|
|
true,
|
|
"REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
|
|
rrdhost_hostname(host), rrdset_id(st)
|
|
);
|
|
is_error = true;
|
|
}
|
|
|
|
if(is_error)
|
|
errors++;
|
|
else
|
|
ok++;
|
|
}
|
|
rrdset_foreach_done(st);
|
|
|
|
internal_error(errors,
|
|
"REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
|
|
rrdhost_hostname(host), ok, errors);
|
|
|
|
return errors;
|
|
}
|
|
|
|
static void verify_all_hosts_charts_are_streaming_now(void) {
|
|
worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
|
|
|
|
size_t errors = 0;
|
|
RRDHOST *host;
|
|
dfe_start_read(rrdhost_root_index, host)
|
|
errors += verify_host_charts_are_streaming_now(host);
|
|
dfe_done(host);
|
|
|
|
size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
|
|
netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
|
|
executed - replication_globals.main_thread.last_executed, errors);
|
|
replication_globals.main_thread.last_executed = executed;
|
|
}
|
|
|
|
static void replication_initialize_workers(bool master) {
|
|
worker_register("REPLICATION");
|
|
worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
|
|
worker_register_job_name(WORKER_JOB_QUERYING, "querying");
|
|
worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
|
|
worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
|
|
worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
|
|
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
|
|
worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
|
|
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
|
|
worker_register_job_name(WORKER_JOB_WAIT, "wait");
|
|
|
|
if(master) {
|
|
worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
|
|
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
|
|
}
|
|
}
|
|
|
|
#define REQUEST_OK (0)
|
|
#define REQUEST_QUEUE_EMPTY (-1)
|
|
#define REQUEST_CHART_NOT_FOUND (-2)
|
|
|
|
static __thread struct replication_thread_pipeline {
|
|
int max_requests_ahead;
|
|
struct replication_request *rqs;
|
|
int rqs_last_executed, rqs_last_prepared;
|
|
size_t queue_rounds;
|
|
} rtp = {
|
|
.max_requests_ahead = 0,
|
|
.rqs = NULL,
|
|
.rqs_last_executed = 0,
|
|
.rqs_last_prepared = 0,
|
|
.queue_rounds = 0,
|
|
};
|
|
|
|
static void replication_pipeline_cancel_and_cleanup(void) {
|
|
if(!rtp.rqs)
|
|
return;
|
|
|
|
struct replication_request *rq;
|
|
size_t cancelled = 0;
|
|
|
|
do {
|
|
if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
|
|
rtp.rqs_last_executed = 0;
|
|
|
|
rq = &rtp.rqs[rtp.rqs_last_executed];
|
|
|
|
if (rq->q) {
|
|
internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
|
|
internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
|
|
|
|
replication_response_cancel_and_finalize(rq->q);
|
|
rq->q = NULL;
|
|
cancelled++;
|
|
}
|
|
|
|
rq->executed = true;
|
|
rq->found = false;
|
|
|
|
} while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
|
|
|
|
internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
|
|
|
|
freez(rtp.rqs);
|
|
rtp.rqs = NULL;
|
|
rtp.max_requests_ahead = 0;
|
|
rtp.rqs_last_executed = 0;
|
|
rtp.rqs_last_prepared = 0;
|
|
rtp.queue_rounds = 0;
|
|
}
|
|
|
|
static int replication_pipeline_execute_next(void) {
|
|
struct replication_request *rq;
|
|
|
|
if(unlikely(!rtp.rqs)) {
|
|
rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
|
|
|
|
if(rtp.max_requests_ahead > libuv_worker_threads * 2)
|
|
rtp.max_requests_ahead = libuv_worker_threads * 2;
|
|
|
|
if(rtp.max_requests_ahead < 2)
|
|
rtp.max_requests_ahead = 2;
|
|
|
|
rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
|
|
__atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
|
|
}
|
|
|
|
// fill the queue
|
|
do {
|
|
if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
|
|
rtp.rqs_last_prepared = 0;
|
|
rtp.queue_rounds++;
|
|
}
|
|
|
|
internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
|
|
"REPLAY FATAL: slot is used by query that has not been executed!");
|
|
|
|
worker_is_busy(WORKER_JOB_FIND_NEXT);
|
|
rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
|
|
rq = &rtp.rqs[rtp.rqs_last_prepared];
|
|
|
|
if(rq->found) {
|
|
if (!rq->st) {
|
|
worker_is_busy(WORKER_JOB_FIND_CHART);
|
|
rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
|
|
}
|
|
|
|
if (rq->st && !rq->q) {
|
|
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
|
|
rq->q = replication_response_prepare(
|
|
rq->st,
|
|
rq->start_streaming,
|
|
rq->after,
|
|
rq->before,
|
|
rq->sender->capabilities);
|
|
}
|
|
|
|
rq->executed = false;
|
|
}
|
|
|
|
} while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
|
|
|
|
// pick the first usable
|
|
do {
|
|
if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
|
|
rtp.rqs_last_executed = 0;
|
|
|
|
rq = &rtp.rqs[rtp.rqs_last_executed];
|
|
|
|
if(rq->found) {
|
|
internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
|
|
|
|
if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
|
|
// the sender has reconnected since this request was queued,
|
|
// we can safely throw it away, since the parent will resend it
|
|
replication_response_cancel_and_finalize(rq->q);
|
|
rq->executed = true;
|
|
rq->found = false;
|
|
rq->q = NULL;
|
|
}
|
|
else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
|
|
// the sender buffer is full, so we can ignore this request,
|
|
// it has already been marked as 'preprocessed' in the dictionary,
|
|
// and the sender will put it back in when there is
|
|
// enough room in the buffer for processing replication requests
|
|
replication_response_cancel_and_finalize(rq->q);
|
|
rq->executed = true;
|
|
rq->found = false;
|
|
rq->q = NULL;
|
|
}
|
|
else {
|
|
// we can execute this,
|
|
// delete it from the dictionary
|
|
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
|
|
dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
|
|
}
|
|
}
|
|
else
|
|
internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
|
|
|
|
} while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
|
|
|
|
if(unlikely(!rq->found)) {
|
|
worker_is_idle();
|
|
return REQUEST_QUEUE_EMPTY;
|
|
}
|
|
|
|
replication_set_latest_first_time(rq->after);
|
|
|
|
bool chart_found = replication_execute_request(rq, true);
|
|
rq->executed = true;
|
|
rq->found = false;
|
|
rq->q = NULL;
|
|
|
|
if(unlikely(!chart_found)) {
|
|
worker_is_idle();
|
|
return REQUEST_CHART_NOT_FOUND;
|
|
}
|
|
|
|
worker_is_idle();
|
|
return REQUEST_OK;
|
|
}
|
|
|
|
static void replication_worker_cleanup(void *ptr __maybe_unused) {
|
|
replication_pipeline_cancel_and_cleanup();
|
|
worker_unregister();
|
|
}
|
|
|
|
static void *replication_worker_thread(void *ptr) {
|
|
replication_initialize_workers(false);
|
|
|
|
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
|
|
|
|
while(service_running(SERVICE_REPLICATION)) {
|
|
if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
|
|
sender_thread_buffer_free();
|
|
worker_is_busy(WORKER_JOB_WAIT);
|
|
worker_is_idle();
|
|
sleep_usec(1 * USEC_PER_SEC);
|
|
}
|
|
}
|
|
|
|
netdata_thread_cleanup_pop(1);
|
|
return NULL;
|
|
}
|
|
|
|
static void replication_main_cleanup(void *ptr) {
|
|
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
|
|
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
|
|
|
|
replication_pipeline_cancel_and_cleanup();
|
|
|
|
int threads = (int)replication_globals.main_thread.threads;
|
|
for(int i = 0; i < threads ;i++) {
|
|
netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
|
|
freez(replication_globals.main_thread.threads_ptrs[i]);
|
|
__atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
|
|
}
|
|
freez(replication_globals.main_thread.threads_ptrs);
|
|
replication_globals.main_thread.threads_ptrs = NULL;
|
|
__atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
|
|
|
|
aral_destroy(replication_globals.aral_rse);
|
|
replication_globals.aral_rse = NULL;
|
|
|
|
// custom code
|
|
worker_unregister();
|
|
|
|
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
|
|
}
|
|
|
|
void replication_initialize(void) {
|
|
replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
|
|
0, 65536, aral_by_size_statistics(),
|
|
NULL, NULL, false, false);
|
|
}
|
|
|
|
void *replication_thread_main(void *ptr __maybe_unused) {
|
|
replication_initialize_workers(true);
|
|
|
|
int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
|
|
if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
|
|
netdata_log_error("replication threads given %d is invalid, resetting to 1", threads);
|
|
threads = 1;
|
|
}
|
|
|
|
if(--threads) {
|
|
replication_globals.main_thread.threads = threads;
|
|
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
|
|
__atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
|
|
|
|
for(int i = 0; i < threads ;i++) {
|
|
char tag[NETDATA_THREAD_TAG_MAX + 1];
|
|
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
|
|
replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
|
|
__atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
|
|
netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
|
|
NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
|
|
}
|
|
}
|
|
|
|
netdata_thread_cleanup_push(replication_main_cleanup, ptr);
|
|
|
|
// start from 100% completed
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
|
|
|
|
long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
|
|
bool slow = true; // control the time we sleep - it has to start with true!
|
|
usec_t last_now_mono_ut = now_monotonic_usec();
|
|
time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
|
|
|
|
size_t last_executed = 0;
|
|
size_t last_sender_resets = 0;
|
|
|
|
while(service_running(SERVICE_REPLICATION)) {
|
|
|
|
// statistics
|
|
usec_t now_mono_ut = now_monotonic_usec();
|
|
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
|
|
last_now_mono_ut = now_mono_ut;
|
|
|
|
worker_is_busy(WORKER_JOB_STATISTICS);
|
|
replication_recursive_lock();
|
|
|
|
size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
|
|
if(last_executed != current_executed) {
|
|
run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
|
|
last_executed = current_executed;
|
|
slow = false;
|
|
}
|
|
|
|
if(replication_reset_next_point_in_time_countdown-- == 0) {
|
|
// once per second, make it scan all the pending requests next time
|
|
replication_set_next_point_in_time(0, 0);
|
|
// replication_globals.protected.skipped_no_room_since_last_reset = 0;
|
|
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
|
|
}
|
|
|
|
if(--run_verification_countdown == 0) {
|
|
if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
|
|
// reset the statistics about completion percentage
|
|
replication_globals.unsafe.first_time_t = 0;
|
|
replication_set_latest_first_time(0);
|
|
|
|
verify_all_hosts_charts_are_streaming_now();
|
|
|
|
run_verification_countdown = LONG_MAX;
|
|
slow = true;
|
|
}
|
|
else
|
|
run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
|
|
}
|
|
|
|
time_t latest_first_time_t = replication_get_latest_first_time();
|
|
if(latest_first_time_t && replication_globals.unsafe.pending) {
|
|
// completion percentage statistics
|
|
time_t now = now_realtime_sec();
|
|
time_t total = now - replication_globals.unsafe.first_time_t;
|
|
time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
|
|
(NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
|
|
}
|
|
else
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
|
|
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
|
|
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
|
|
|
|
replication_recursive_unlock();
|
|
worker_is_idle();
|
|
}
|
|
|
|
if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
|
|
|
|
worker_is_busy(WORKER_JOB_WAIT);
|
|
replication_recursive_lock();
|
|
|
|
// the timeout also defines now frequently we will traverse all the pending requests
|
|
// when the outbound buffers of all senders is full
|
|
usec_t timeout;
|
|
if(slow) {
|
|
// no work to be done, wait for a request to come in
|
|
timeout = 1000 * USEC_PER_MS;
|
|
sender_thread_buffer_free();
|
|
}
|
|
|
|
else if(replication_globals.unsafe.pending > 0) {
|
|
if(replication_globals.unsafe.sender_resets == last_sender_resets)
|
|
timeout = 1000 * USEC_PER_MS;
|
|
|
|
else {
|
|
// there are pending requests waiting to be executed,
|
|
// but none could be executed at this time.
|
|
// try again after this time.
|
|
timeout = 100 * USEC_PER_MS;
|
|
}
|
|
|
|
last_sender_resets = replication_globals.unsafe.sender_resets;
|
|
}
|
|
else {
|
|
// no requests pending, but there were requests recently (run_verification_countdown)
|
|
// so, try in a short time.
|
|
// if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
|
|
timeout = 10 * USEC_PER_MS;
|
|
last_sender_resets = replication_globals.unsafe.sender_resets;
|
|
}
|
|
|
|
replication_recursive_unlock();
|
|
|
|
worker_is_idle();
|
|
sleep_usec(timeout);
|
|
|
|
// make it scan all the pending requests next time
|
|
replication_set_next_point_in_time(0, 0);
|
|
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
|
|
|
|
continue;
|
|
}
|
|
}
|
|
|
|
netdata_thread_cleanup_pop(1);
|
|
return NULL;
|
|
}
|