0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-13 17:19:11 +00:00

fixed bug in streaming sender read ()

This commit is contained in:
Costa Tsaousis 2024-12-06 02:53:22 +02:00 committed by GitHub
parent 77a3cdde85
commit cb731f303f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 59 additions and 50 deletions

View file

@ -455,7 +455,7 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, use
if(slots < 3)
slots = 3;
size_t size = slots * CTX_POINT_SIZE_BYTES(ctx);
size_t size = slots * CTX_POINT_SIZE_BYTES(ctx); (void)size;
// internal_error(true, "PAGE ALLOC %zu bytes (%zu max)", size, max_size);

View file

@ -417,7 +417,7 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st
}
// process poll() events for streaming receivers
void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events __maybe_unused, time_t now_s) {
void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events __maybe_unused, usec_t now_ut) {
PARSER *parser = __atomic_load_n(&rpt->thread.parser, __ATOMIC_RELAXED);
ND_LOG_STACK lgs[] = {
ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
@ -439,7 +439,7 @@ void stream_receive_process_poll_events(struct stream_thread *sth, struct receiv
return;
}
rpt->last_msg_t = now_s;
rpt->last_msg_t = (time_t)(now_ut / USEC_PER_SEC);
if(rpt->thread.compressed.enabled) {
worker_is_busy(WORKER_STREAM_JOB_SOCKET_RECEIVE);

View file

@ -55,7 +55,8 @@ struct sender_state {
size_t bytes_uncompressed;
// the current buffer statistics
// these SHOULD ALWAYS BE CALCULATED ON EVERY sender_unlock() IF THE BUFFER WAS MODIFIED
// these SHOULD ALWAYS BE CALCULATED ON EVERY stream_sender_unlock() IF THE BUFFER WAS MODIFIED
// stream_sender_lock() IS REQUIRED TO READ/WRITE THESE
size_t bytes_outstanding;
size_t bytes_available;
NETDATA_DOUBLE buffer_ratio;
@ -65,6 +66,8 @@ struct sender_state {
size_t bytes_sent;
size_t bytes_sent_by_type[STREAM_TRAFFIC_TYPE_MAX];
usec_t last_traffic_ut;
struct pollfd_meta meta;
} thread;
@ -73,7 +76,6 @@ struct sender_state {
} connector;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
time_t last_traffic_seen_t;
time_t last_state_since_t; // the timestamp of the last state (online/offline) change
struct {

View file

@ -7,13 +7,13 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre
// --------------------------------------------------------------------------------------------------------------------
static void stream_sender_cbuffer_recreate_timed_unsafe(struct sender_state *s, time_t now_s, bool force) {
static __thread time_t last_reset_time_s = 0;
static void stream_sender_cbuffer_recreate_timed_unsafe(struct sender_state *s, usec_t now_ut, bool force) {
static __thread usec_t last_reset_time_ut = 0;
if(!force && now_s - last_reset_time_s < 300)
if(!force && now_ut - last_reset_time_ut < 300 * USEC_PER_SEC)
return;
last_reset_time_s = now_s;
last_reset_time_ut = now_ut;
s->sbuf.recreates++; // we increase even if we don't do it, to have sender_start() recreate its buffers
@ -30,7 +30,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->sbuf.cb);
stream_sender_cbuffer_recreate_timed_unsafe(host->sender, now_monotonic_sec(), true);
stream_sender_cbuffer_recreate_timed_unsafe(host->sender, now_monotonic_usec(), true);
stream_sender_unlock(host->sender);
}
@ -76,7 +76,7 @@ void stream_sender_on_connect(struct sender_state *s) {
rrdpush_sender_charts_and_replication_reset(s);
rrdpush_sender_cbuffer_flush(s->host);
s->last_traffic_seen_t = now_monotonic_sec();
s->thread.last_traffic_ut = now_monotonic_usec();
s->rbuf.read_len = 0;
s->sbuf.cb->read = 0;
s->sbuf.cb->write = 0;
@ -370,23 +370,27 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre
stream_connector_requeue(s);
}
void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) {
void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t now_ut) {
internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ );
usec_t now_ut = now_monotonic_usec();
time_t now_s = (time_t)(now_ut / USEC_PER_SEC);
size_t bytes_uncompressed = 0;
size_t bytes_compressed = 0;
NETDATA_DOUBLE buffer_ratio = 0.0;
NETDATA_DOUBLE overall_buffer_ratio = 0.0;
Word_t idx = 0;
for(struct sender_state *s = SENDERS_FIRST(&sth->snd.senders, &idx);
s;
s = SENDERS_NEXT(&sth->snd.senders, &idx)) {
// If the TCP window never opened, then something is wrong, restart connection
if(unlikely(now_s - s->last_traffic_seen_t > stream_send.parents.timeout_s &&
stream_sender_lock(s);
size_t outstanding = cbuffer_next_unsafe(s->sbuf.cb, NULL);
NETDATA_DOUBLE buffer_ratio = s->thread.buffer_ratio;
stream_sender_unlock(s);
if (buffer_ratio > overall_buffer_ratio)
overall_buffer_ratio = buffer_ratio;
if(unlikely(s->thread.last_traffic_ut + stream_send.parents.timeout_s * USEC_PER_SEC < now_ut &&
!stream_sender_pending_replication_requests(s) &&
!stream_sender_replicating_charts(s)
)) {
@ -404,28 +408,30 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
char since[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(since, sizeof(since), s->thread.last_traffic_ut, 2, false);
char pending[64];
size_snprintf(pending, sizeof(pending), outstanding, "B", false);
nd_log(NDLS_DAEMON, NDLP_ERR,
"STREAM SEND[%zu] %s [send to %s]: could not send metrics for %ld seconds - closing connection - "
"we have sent %zu bytes on this connection via %zu send attempts.",
"STREAM SEND[%zu] %s [send to %s]: could not send data for %ld seconds - closing connection - "
"we have sent %zu bytes in %zu operations, it is idle since: %s, and we have %s pending to send "
"(buffer is used %.2f%%).",
sth->id, rrdhost_hostname(s->host), s->connected_to, stream_send.parents.timeout_s,
s->thread.bytes_sent, s->thread.sends);
s->thread.bytes_sent, s->thread.sends, since, pending, buffer_ratio);
stream_sender_move_running_to_connector_or_remove(sth, s, STREAM_HANDSHAKE_DISCONNECT_SOCKET_TIMEOUT, true);
continue;
}
stream_sender_lock(s);
{
bytes_compressed += s->thread.bytes_compressed;
bytes_uncompressed += s->thread.bytes_uncompressed;
uint64_t outstanding = s->thread.bytes_outstanding;
if (s->thread.buffer_ratio > buffer_ratio)
buffer_ratio = s->thread.buffer_ratio;
bytes_compressed += s->thread.bytes_compressed;
bytes_uncompressed += s->thread.bytes_uncompressed;
if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ | (outstanding ? ND_POLL_WRITE : 0), &s->thread.meta))
internal_fatal(true, "Failed to update sender socket in nd_poll()");
}
stream_sender_unlock(s);
if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ | (outstanding ? ND_POLL_WRITE : 0), &s->thread.meta))
nd_log(NDLS_DAEMON, NDLP_ERR,
"STREAM SEND[%zu] %s [send to %s]: failed to update nd_poll().",
sth->id, rrdhost_hostname(s->host), s->connected_to);
}
if (bytes_compressed && bytes_uncompressed) {
@ -435,10 +441,10 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) {
worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed);
worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, buffer_ratio);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, overall_buffer_ratio);
}
void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, time_t now_s) {
void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, usec_t now_ut) {
internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ );
ND_LOG_STACK lgs[] = {
@ -490,16 +496,18 @@ void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_
if (likely(bytes > 0)) {
cbuffer_remove_unsafe(s->sbuf.cb, bytes);
stream_sender_thread_data_sent_data_unsafe(s, bytes);
s->last_traffic_seen_t = now_s;
s->thread.last_traffic_ut = now_ut;
sth->snd.bytes_sent += bytes;
if(!s->thread.bytes_outstanding) {
// we sent them all - remove POLLOUT
// we sent them all - remove ND_POLL_WRITE
if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ, &s->thread.meta))
internal_fatal(true, "Failed to update sender socket in nd_poll()");
nd_log(NDLS_DAEMON, NDLP_ERR,
"STREAM SEND[%zu] %s [send to %s]: failed to update nd_poll().",
sth->id, rrdhost_hostname(s->host), s->connected_to);
// recreate the circular buffer if we have to
stream_sender_cbuffer_recreate_timed_unsafe(s, now_s, false);
stream_sender_cbuffer_recreate_timed_unsafe(s, now_ut, false);
}
}
else if (bytes < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR)
@ -519,14 +527,14 @@ void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_
}
}
if(events & POLLIN) {
if(events & ND_POLL_READ) {
// we can receive data from this socket
worker_is_busy(WORKER_STREAM_JOB_SOCKET_RECEIVE);
ssize_t bytes = nd_sock_revc_nowait(&s->sock, s->rbuf.b + s->rbuf.read_len, sizeof(s->rbuf.b) - s->rbuf.read_len - 1);
if (bytes > 0) {
s->rbuf.read_len += bytes;
s->last_traffic_seen_t = now_s;
s->thread.last_traffic_ut = now_ut;
sth->snd.bytes_received += bytes;
}
else if (bytes == 0 || errno == ECONNRESET) {

View file

@ -177,7 +177,7 @@ static void stream_thread_messages_resize_unsafe(struct stream_thread *sth) {
// --------------------------------------------------------------------------------------------------------------------
static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_result_t *ev, time_t now_s, size_t *replay_entries) {
static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_result_t *ev, usec_t now_ut, size_t *replay_entries) {
struct pollfd_meta *m = ev->data;
internal_fatal(!m, "Failed to get meta from event");
@ -185,7 +185,7 @@ static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_r
case POLLFD_TYPE_SENDER: {
struct sender_state *s = m->s;
internal_fatal(SENDERS_GET(&sth->snd.senders, (Word_t)s) == NULL, "Sender is not found in the senders list");
stream_sender_process_poll_events(sth, s, ev->events, now_s);
stream_sender_process_poll_events(sth, s, ev->events, now_ut);
*replay_entries += dictionary_entries(s->replication.requests);
break;
}
@ -193,7 +193,7 @@ static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_r
case POLLFD_TYPE_RECEIVER: {
struct receiver_state *rpt = m->rpt;
internal_fatal(RECEIVERS_GET(&sth->rcv.receivers, (Word_t)rpt) == NULL, "Receiver is not found in the receiver list");
stream_receive_process_poll_events(sth, rpt, ev->events, now_s);
stream_receive_process_poll_events(sth, rpt, ev->events, now_ut);
break;
}
@ -356,7 +356,7 @@ void *stream_thread(void *ptr) {
// periodically check the entire list of nodes
// this detects unresponsive parents too (timeout)
stream_sender_check_all_nodes_from_poll(sth);
stream_sender_check_all_nodes_from_poll(sth, now_ut);
worker_set_metric(WORKER_SENDER_JOB_MESSAGES, (NETDATA_DOUBLE)(sth->messages.processed));
worker_set_metric(WORKER_STREAM_METRIC_NODES, (NETDATA_DOUBLE)sth->nodes_count);
@ -390,12 +390,11 @@ void *stream_thread(void *ptr) {
continue;
}
time_t now_s = now_monotonic_sec();
if(nd_thread_signaled_to_cancel() || !service_running(SERVICE_STREAMING))
break;
exit_thread = stream_thread_process_poll_slot(sth, &ev, now_s, &replay_entries);
now_ut = now_monotonic_usec();
exit_thread = stream_thread_process_poll_slot(sth, &ev, now_ut, &replay_entries);
}
// dequeue

View file

@ -173,13 +173,13 @@ extern struct stream_thread_globals stream_thread_globals;
void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth);
void stream_receiver_move_queue_to_running_unsafe(struct stream_thread *sth);
void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth);
void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t now_ut);
void stream_receiver_add_to_queue(struct receiver_state *rpt);
void stream_sender_add_to_connector_queue(struct rrdhost *host);
void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, time_t now_s);
void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events, time_t now_s);
void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, usec_t now_ut);
void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events, usec_t now_ut);
void stream_sender_cleanup(struct stream_thread *sth);
void stream_receiver_cleanup(struct stream_thread *sth);