0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 03:02:41 +00:00

replication fixes 9 ()

* replication fixes 9

* no room metric is now absolute

* decrement senders full and flip the actions on sender full and sender available

* finer timings

* execute all requests, unconditionally, once they are in the replication queue; worker charts are now sorted

* remove left-over debug message

* log verification of replication completion only when there are no pending requests any more

* use up to 50% of the sender buffer for replication responses
This commit is contained in:
Costa Tsaousis 2022-12-02 11:03:57 +02:00 committed by GitHub
parent 99857f8fb2
commit b53dccd588
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 99 deletions

View file

@ -2322,7 +2322,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.value.%s", wu->name_lowercase, job_name_sanitized);
char title[1000 + 1];
snprintf(title, 1000, "Netdata Workers %s Value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
snprintf(title, 1000, "Netdata Workers %s value of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
wu->per_job_type[i].st = rrdset_create_localhost(
"netdata"
@ -2334,7 +2334,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
, (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"value"
, "netdata"
, "stats"
, wu->priority + 5
, wu->priority + 5 + i
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@ -2378,7 +2378,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
snprintf(context, RRD_ID_LENGTH_MAX, "netdata.workers.%s.rate.%s", wu->name_lowercase, job_name_sanitized);
char title[1000 + 1];
snprintf(title, 1000, "Netdata Workers %s Rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
snprintf(title, 1000, "Netdata Workers %s rate of %s", wu->name_lowercase, string2str(wu->per_job_type[i].name));
wu->per_job_type[i].st = rrdset_create_localhost(
"netdata"
@ -2390,7 +2390,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
, (wu->per_job_type[i].units)?string2str(wu->per_job_type[i].units):"rate"
, "netdata"
, "stats"
, wu->priority + 5
, wu->priority + 5 + i
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);

View file

@ -3253,8 +3253,6 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
"RRDCONTEXT: context '%s' of host '%s', deleted from rrdmetrics dictionary.",
string2str(rc->id),
rrdhost_hostname(host));
fprintf(stderr, "RRDCONTEXT: deleted context '%s'", string2str(rc->id));
}
// the item is referenced in the dictionary

View file

@ -4,7 +4,7 @@
#include "Judy.h"
#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
#define WORKER_JOB_FIND_NEXT 1
@ -14,17 +14,17 @@
#define WORKER_JOB_CHECK_CONSISTENCY 5
#define WORKER_JOB_BUFFER_COMMIT 6
#define WORKER_JOB_CLEANUP 7
#define WORKER_JOB_WAIT 8
// master thread worker jobs
#define WORKER_JOB_STATISTICS 8
#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9
#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10
#define WORKER_JOB_CUSTOM_METRIC_ADDED 11
#define WORKER_JOB_CUSTOM_METRIC_DONE 12
#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13
#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14
#define WORKER_JOB_CUSTOM_METRIC_WAITS 15
#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
#define WORKER_JOB_STATISTICS 9
#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
#define WORKER_JOB_CUSTOM_METRIC_DONE 14
#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
#define SECONDS_TO_RESET_POINT_IN_TIME 10
@ -591,6 +591,7 @@ struct replication_request {
Word_t unique_id; // auto-increment, later requests have bigger
bool found; // used as a result boolean for the find call
bool indexed_in_judy; // true when the request is indexed in judy
bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
};
// replication sort entry in JudyL array
@ -605,7 +606,7 @@ struct replication_sort_entry {
// the global variables for the replication thread
static struct replication_thread {
netdata_mutex_t mutex;
SPINLOCK spinlock;
struct {
size_t pending; // number of requests pending in the queue
@ -614,9 +615,8 @@ static struct replication_thread {
// statistics
size_t added; // number of requests added to the queue
size_t removed; // number of requests removed from the queue
size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent
size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses
// size_t skipped_no_room_since_last_reset;
size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
size_t senders_full; // number of times a sender reset our last position in the queue
size_t sender_resets; // number of times a sender reset our last position in the queue
time_t first_time_t; // the minimum 'after' we encountered
@ -634,7 +634,6 @@ static struct replication_thread {
} atomic; // access should be with atomic operations
struct {
size_t waits;
size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
netdata_thread_t **threads_ptrs;
@ -642,17 +641,16 @@ static struct replication_thread {
} main_thread; // access is allowed only by the main thread
} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.unsafe = {
.pending = 0,
.unique_id = 0,
.added = 0,
.removed = 0,
.skipped_not_connected = 0,
.skipped_no_room = 0,
// .skipped_no_room_since_last_reset = 0,
.pending_no_room = 0,
.sender_resets = 0,
.senders_full = 0,
.first_time_t = 0,
@ -667,7 +665,6 @@ static struct replication_thread {
.latest_first_time = 0,
},
.main_thread = {
.waits = 0,
.last_executed = 0,
.threads = 0,
.threads_ptrs = NULL,
@ -682,11 +679,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
netdata_mutex_lock(&replication_globals.mutex);
netdata_spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
netdata_mutex_unlock(&replication_globals.mutex);
netdata_spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
@ -736,6 +733,7 @@ static struct replication_sort_entry *replication_sort_entry_create_unsafe(struc
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = false;
return rse;
}
@ -743,9 +741,20 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
freez(rse);
}
static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
static void replication_sort_entry_add(struct replication_request *rq) {
replication_recursive_lock();
if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
replication_globals.unsafe.pending_no_room++;
replication_recursive_unlock();
return;
}
if(rq->not_indexed_buffer_full)
replication_globals.unsafe.pending_no_room--;
struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
// if(rq->after < (time_t)replication_globals.protected.queue.after &&
@ -770,13 +779,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
rq->indexed_in_judy = true;
rq->not_indexed_buffer_full = false;
if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
return rse;
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
@ -806,7 +814,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
return inner_judy_deleted;
}
static void replication_sort_entry_del(struct replication_request *rq) {
static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
Pvoid_t *inner_judy_pptr;
struct replication_sort_entry *rse_to_delete = NULL;
@ -819,6 +827,11 @@ static void replication_sort_entry_del(struct replication_request *rq) {
if (our_item_pptr) {
rse_to_delete = *our_item_pptr;
replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
if(buffer_full) {
replication_globals.unsafe.pending_no_room++;
rq->not_indexed_buffer_full = true;
}
}
}
@ -877,44 +890,17 @@ static struct replication_request replication_request_get_first_available() {
while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct replication_request *rq = rse->rq;
struct sender_state *s = rq->sender;
if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
// there is room for this request in the sender buffer
// copy the request to return it
rq_to_return = *rq;
rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
bool sender_is_connected =
rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
// set the return result to found
rq_to_return.found = true;
bool sender_has_been_flushed_since_this_request =
rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
// skip this request, the sender is not connected, or it has reconnected
replication_globals.unsafe.skipped_not_connected++;
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
// we removed the item from the outer JudyL
break;
}
else {
// this request is good to execute
// copy the request to return it
rq_to_return = *rq;
rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
// set the return result to found
rq_to_return.found = true;
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
// we removed the item from the outer JudyL
break;
}
}
else {
replication_globals.unsafe.skipped_no_room++;
// replication_globals.protected.skipped_no_room_since_last_reset++;
}
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
// we removed the item from the outer JudyL
break;
}
// call JudyLNext from now on
@ -959,7 +945,20 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
replication_recursive_lock();
if(!rq->indexed_in_judy) {
if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
// we can replace this command
internal_error(
true,
"STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
(unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
(unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
rq->after = rq_new->after;
rq->before = rq_new->before;
rq->start_streaming = rq_new->start_streaming;
}
else if(!rq->indexed_in_judy) {
replication_sort_entry_add(rq);
internal_error(
true,
@ -991,7 +990,13 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
rrdpush_sender_replicating_charts_minus_one(rq->sender);
if(rq->indexed_in_judy)
replication_sort_entry_del(rq);
replication_sort_entry_del(rq, false);
else if(rq->not_indexed_buffer_full) {
replication_recursive_lock();
replication_globals.unsafe.pending_no_room--;
replication_recursive_unlock();
}
string_freez(rq->chart_id);
}
@ -1046,6 +1051,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
cleanup:
string_freez(rq->chart_id);
worker_is_idle();
return ret;
}
@ -1060,6 +1066,8 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.before = before,
.start_streaming = start_streaming,
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
.indexed_in_judy = false,
.not_indexed_buffer_full = false,
};
if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
@ -1094,15 +1102,36 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
s->replication.unsafe.reached_max = true;
if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
rrdpush_sender_replication_buffer_full_set(s, true);
struct replication_request *rq;
dfe_start_read(s->replication.requests, rq) {
if(rq->indexed_in_judy && !rq->not_indexed_buffer_full) {
replication_sort_entry_del(rq, true);
}
}
dfe_done(rq);
if(s->replication.unsafe.reached_max &&
percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
s->replication.unsafe.reached_max = false;
replication_recursive_lock();
// replication_set_next_point_in_time(0, 0);
replication_globals.unsafe.senders_full++;
replication_recursive_unlock();
}
else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
rrdpush_sender_replication_buffer_full_set(s, false);
struct replication_request *rq;
dfe_start_read(s->replication.requests, rq) {
if(!rq->indexed_in_judy && rq->not_indexed_buffer_full) {
replication_sort_entry_add(rq);
}
}
dfe_done(rq);
replication_recursive_lock();
replication_globals.unsafe.senders_full--;
replication_globals.unsafe.sender_resets++;
// replication_set_next_point_in_time(0, 0);
replication_recursive_unlock();
}
@ -1188,17 +1217,17 @@ static void replication_initialize_workers(bool master) {
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
worker_register_job_name(WORKER_JOB_WAIT, "wait");
if(master) {
worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
}
}
@ -1210,8 +1239,10 @@ static int replication_execute_next_pending_request(void) {
worker_is_busy(WORKER_JOB_FIND_NEXT);
struct replication_request rq = replication_request_get_first_available();
if(unlikely(!rq.found))
if(unlikely(!rq.found)) {
worker_is_idle();
return REQUEST_QUEUE_EMPTY;
}
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);
@ -1221,9 +1252,12 @@ static int replication_execute_next_pending_request(void) {
replication_set_latest_first_time(rq.after);
if(unlikely(!replication_execute_request(&rq, true)))
if(unlikely(!replication_execute_request(&rq, true))) {
worker_is_idle();
return REQUEST_CHART_NOT_FOUND;
}
worker_is_idle();
return REQUEST_OK;
}
@ -1238,6 +1272,7 @@ static void *replication_worker_thread(void *ptr) {
while(!netdata_exit) {
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
sleep_usec(1 * USEC_PER_SEC);
}
@ -1305,6 +1340,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
last_now_mono_ut = now_mono_ut;
worker_is_busy(WORKER_JOB_STATISTICS);
replication_recursive_lock();
size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
@ -1321,19 +1357,21 @@ void *replication_thread_main(void *ptr __maybe_unused) {
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
}
if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) {
// reset the statistics about completion percentage
replication_globals.unsafe.first_time_t = 0;
replication_set_latest_first_time(0);
if(--run_verification_countdown == 0) {
if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
// reset the statistics about completion percentage
replication_globals.unsafe.first_time_t = 0;
replication_set_latest_first_time(0);
verify_all_hosts_charts_are_streaming_now();
verify_all_hosts_charts_are_streaming_now();
run_verification_countdown = LONG_MAX;
slow = true;
run_verification_countdown = LONG_MAX;
slow = true;
}
else
run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
}
worker_is_busy(WORKER_JOB_STATISTICS);
time_t latest_first_time_t = replication_get_latest_first_time();
if(latest_first_time_t && replication_globals.unsafe.pending) {
// completion percentage statistics
@ -1349,15 +1387,17 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits);
worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
replication_recursive_unlock();
worker_is_idle();
}
if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();
// the timeout also defines now frequently we will traverse all the pending requests
@ -1388,7 +1428,6 @@ void *replication_thread_main(void *ptr __maybe_unused) {
last_sender_resets = replication_globals.unsafe.sender_resets;
}
replication_globals.main_thread.waits++;
replication_recursive_unlock();
worker_is_idle();

View file

@ -168,12 +168,9 @@ struct sender_state {
struct {
size_t pending_requests; // the currently outstanding replication requests
size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
bool reached_max; // true when the sender buffer should not get more replication responses
} atomic;
struct {
bool reached_max; // used to avoid resetting the replication thread too frequently
} unsafe; // protected by sender mutex
} replication;
struct {
@ -182,10 +179,13 @@ struct sender_state {
} atomic;
};
#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED);
#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)