diff --git a/src/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c index 4951577d95..cdcf17326b 100755 --- a/src/database/engine/rrdengineapi.c +++ b/src/database/engine/rrdengineapi.c @@ -455,7 +455,7 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, use if(slots < 3) slots = 3; - size_t size = slots * CTX_POINT_SIZE_BYTES(ctx); + size_t size = slots * CTX_POINT_SIZE_BYTES(ctx); (void)size; // internal_error(true, "PAGE ALLOC %zu bytes (%zu max)", size, max_size); diff --git a/src/streaming/stream-receiver.c b/src/streaming/stream-receiver.c index d5bfcd96e4..ea9ae6e065 100644 --- a/src/streaming/stream-receiver.c +++ b/src/streaming/stream-receiver.c @@ -417,7 +417,7 @@ static void stream_receiver_remove(struct stream_thread *sth, struct receiver_st } // process poll() events for streaming receivers -void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events __maybe_unused, time_t now_s) { +void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events __maybe_unused, usec_t now_ut) { PARSER *parser = __atomic_load_n(&rpt->thread.parser, __ATOMIC_RELAXED); ND_LOG_STACK lgs[] = { ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), @@ -439,7 +439,7 @@ void stream_receive_process_poll_events(struct stream_thread *sth, struct receiv return; } - rpt->last_msg_t = now_s; + rpt->last_msg_t = (time_t)(now_ut / USEC_PER_SEC); if(rpt->thread.compressed.enabled) { worker_is_busy(WORKER_STREAM_JOB_SOCKET_RECEIVE); diff --git a/src/streaming/stream-sender-internals.h b/src/streaming/stream-sender-internals.h index 07cd71ee9d..cb372d1f06 100644 --- a/src/streaming/stream-sender-internals.h +++ b/src/streaming/stream-sender-internals.h @@ -55,7 +55,8 @@ struct sender_state { size_t bytes_uncompressed; // the current buffer statistics - // these SHOULD ALWAYS BE CALCULATED ON EVERY sender_unlock() IF THE BUFFER WAS MODIFIED + // these SHOULD ALWAYS BE CALCULATED ON EVERY stream_sender_unlock() IF THE BUFFER WAS MODIFIED + // stream_sender_lock() IS REQUIRED TO READ/WRITE THESE size_t bytes_outstanding; size_t bytes_available; NETDATA_DOUBLE buffer_ratio; @@ -65,6 +66,8 @@ struct sender_state { size_t bytes_sent; size_t bytes_sent_by_type[STREAM_TRAFFIC_TYPE_MAX]; + usec_t last_traffic_ut; + struct pollfd_meta meta; } thread; @@ -73,7 +76,6 @@ struct sender_state { } connector; char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c - time_t last_traffic_seen_t; time_t last_state_since_t; // the timestamp of the last state (online/offline) change struct { diff --git a/src/streaming/stream-sender.c b/src/streaming/stream-sender.c index 928e06f842..d69f3d491b 100644 --- a/src/streaming/stream-sender.c +++ b/src/streaming/stream-sender.c @@ -7,13 +7,13 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre // -------------------------------------------------------------------------------------------------------------------- -static void stream_sender_cbuffer_recreate_timed_unsafe(struct sender_state *s, time_t now_s, bool force) { - static __thread time_t last_reset_time_s = 0; +static void stream_sender_cbuffer_recreate_timed_unsafe(struct sender_state *s, usec_t now_ut, bool force) { + static __thread usec_t last_reset_time_ut = 0; - if(!force && now_s - last_reset_time_s < 300) + if(!force && now_ut - last_reset_time_ut < 300 * USEC_PER_SEC) return; - last_reset_time_s = now_s; + last_reset_time_ut = now_ut; s->sbuf.recreates++; // we increase even if we don't do it, to have sender_start() recreate its buffers @@ -30,7 +30,7 @@ static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { // flush the output buffer from any data it may have cbuffer_flush(host->sender->sbuf.cb); - stream_sender_cbuffer_recreate_timed_unsafe(host->sender, now_monotonic_sec(), true); + stream_sender_cbuffer_recreate_timed_unsafe(host->sender, now_monotonic_usec(), true); stream_sender_unlock(host->sender); } @@ -76,7 +76,7 @@ void stream_sender_on_connect(struct sender_state *s) { rrdpush_sender_charts_and_replication_reset(s); rrdpush_sender_cbuffer_flush(s->host); - s->last_traffic_seen_t = now_monotonic_sec(); + s->thread.last_traffic_ut = now_monotonic_usec(); s->rbuf.read_len = 0; s->sbuf.cb->read = 0; s->sbuf.cb->write = 0; @@ -370,23 +370,27 @@ static void stream_sender_move_running_to_connector_or_remove(struct stream_thre stream_connector_requeue(s); } -void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) { +void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t now_ut) { internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); - usec_t now_ut = now_monotonic_usec(); - time_t now_s = (time_t)(now_ut / USEC_PER_SEC); - size_t bytes_uncompressed = 0; size_t bytes_compressed = 0; - NETDATA_DOUBLE buffer_ratio = 0.0; + NETDATA_DOUBLE overall_buffer_ratio = 0.0; Word_t idx = 0; for(struct sender_state *s = SENDERS_FIRST(&sth->snd.senders, &idx); s; s = SENDERS_NEXT(&sth->snd.senders, &idx)) { - // If the TCP window never opened, then something is wrong, restart connection - if(unlikely(now_s - s->last_traffic_seen_t > stream_send.parents.timeout_s && + stream_sender_lock(s); + size_t outstanding = cbuffer_next_unsafe(s->sbuf.cb, NULL); + NETDATA_DOUBLE buffer_ratio = s->thread.buffer_ratio; + stream_sender_unlock(s); + + if (buffer_ratio > overall_buffer_ratio) + overall_buffer_ratio = buffer_ratio; + + if(unlikely(s->thread.last_traffic_ut + stream_send.parents.timeout_s * USEC_PER_SEC < now_ut && !stream_sender_pending_replication_requests(s) && !stream_sender_replicating_charts(s) )) { @@ -404,28 +408,30 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); + char since[RFC3339_MAX_LENGTH]; + rfc3339_datetime_ut(since, sizeof(since), s->thread.last_traffic_ut, 2, false); + + char pending[64]; + size_snprintf(pending, sizeof(pending), outstanding, "B", false); + nd_log(NDLS_DAEMON, NDLP_ERR, - "STREAM SEND[%zu] %s [send to %s]: could not send metrics for %ld seconds - closing connection - " - "we have sent %zu bytes on this connection via %zu send attempts.", + "STREAM SEND[%zu] %s [send to %s]: could not send data for %ld seconds - closing connection - " + "we have sent %zu bytes in %zu operations, it is idle since: %s, and we have %s pending to send " + "(buffer is used %.2f%%).", sth->id, rrdhost_hostname(s->host), s->connected_to, stream_send.parents.timeout_s, - s->thread.bytes_sent, s->thread.sends); + s->thread.bytes_sent, s->thread.sends, since, pending, buffer_ratio); stream_sender_move_running_to_connector_or_remove(sth, s, STREAM_HANDSHAKE_DISCONNECT_SOCKET_TIMEOUT, true); continue; } - stream_sender_lock(s); - { - bytes_compressed += s->thread.bytes_compressed; - bytes_uncompressed += s->thread.bytes_uncompressed; - uint64_t outstanding = s->thread.bytes_outstanding; - if (s->thread.buffer_ratio > buffer_ratio) - buffer_ratio = s->thread.buffer_ratio; + bytes_compressed += s->thread.bytes_compressed; + bytes_uncompressed += s->thread.bytes_uncompressed; - if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ | (outstanding ? ND_POLL_WRITE : 0), &s->thread.meta)) - internal_fatal(true, "Failed to update sender socket in nd_poll()"); - } - stream_sender_unlock(s); + if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ | (outstanding ? ND_POLL_WRITE : 0), &s->thread.meta)) + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM SEND[%zu] %s [send to %s]: failed to update nd_poll().", + sth->id, rrdhost_hostname(s->host), s->connected_to); } if (bytes_compressed && bytes_uncompressed) { @@ -435,10 +441,10 @@ void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth) { worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed); worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed); - worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, buffer_ratio); + worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, overall_buffer_ratio); } -void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, time_t now_s) { +void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, usec_t now_ut) { internal_fatal(sth->tid != gettid_cached(), "Function %s() should only be used by the dispatcher thread", __FUNCTION__ ); ND_LOG_STACK lgs[] = { @@ -490,16 +496,18 @@ void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ if (likely(bytes > 0)) { cbuffer_remove_unsafe(s->sbuf.cb, bytes); stream_sender_thread_data_sent_data_unsafe(s, bytes); - s->last_traffic_seen_t = now_s; + s->thread.last_traffic_ut = now_ut; sth->snd.bytes_sent += bytes; if(!s->thread.bytes_outstanding) { - // we sent them all - remove POLLOUT + // we sent them all - remove ND_POLL_WRITE if(!nd_poll_upd(sth->run.ndpl, s->sock.fd, ND_POLL_READ, &s->thread.meta)) - internal_fatal(true, "Failed to update sender socket in nd_poll()"); + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM SEND[%zu] %s [send to %s]: failed to update nd_poll().", + sth->id, rrdhost_hostname(s->host), s->connected_to); // recreate the circular buffer if we have to - stream_sender_cbuffer_recreate_timed_unsafe(s, now_s, false); + stream_sender_cbuffer_recreate_timed_unsafe(s, now_ut, false); } } else if (bytes < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) @@ -519,14 +527,14 @@ void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_ } } - if(events & POLLIN) { + if(events & ND_POLL_READ) { // we can receive data from this socket worker_is_busy(WORKER_STREAM_JOB_SOCKET_RECEIVE); ssize_t bytes = nd_sock_revc_nowait(&s->sock, s->rbuf.b + s->rbuf.read_len, sizeof(s->rbuf.b) - s->rbuf.read_len - 1); if (bytes > 0) { s->rbuf.read_len += bytes; - s->last_traffic_seen_t = now_s; + s->thread.last_traffic_ut = now_ut; sth->snd.bytes_received += bytes; } else if (bytes == 0 || errno == ECONNRESET) { diff --git a/src/streaming/stream-thread.c b/src/streaming/stream-thread.c index cdbbb57c76..2f4e25e132 100644 --- a/src/streaming/stream-thread.c +++ b/src/streaming/stream-thread.c @@ -177,7 +177,7 @@ static void stream_thread_messages_resize_unsafe(struct stream_thread *sth) { // -------------------------------------------------------------------------------------------------------------------- -static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_result_t *ev, time_t now_s, size_t *replay_entries) { +static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_result_t *ev, usec_t now_ut, size_t *replay_entries) { struct pollfd_meta *m = ev->data; internal_fatal(!m, "Failed to get meta from event"); @@ -185,7 +185,7 @@ static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_r case POLLFD_TYPE_SENDER: { struct sender_state *s = m->s; internal_fatal(SENDERS_GET(&sth->snd.senders, (Word_t)s) == NULL, "Sender is not found in the senders list"); - stream_sender_process_poll_events(sth, s, ev->events, now_s); + stream_sender_process_poll_events(sth, s, ev->events, now_ut); *replay_entries += dictionary_entries(s->replication.requests); break; } @@ -193,7 +193,7 @@ static bool stream_thread_process_poll_slot(struct stream_thread *sth, nd_poll_r case POLLFD_TYPE_RECEIVER: { struct receiver_state *rpt = m->rpt; internal_fatal(RECEIVERS_GET(&sth->rcv.receivers, (Word_t)rpt) == NULL, "Receiver is not found in the receiver list"); - stream_receive_process_poll_events(sth, rpt, ev->events, now_s); + stream_receive_process_poll_events(sth, rpt, ev->events, now_ut); break; } @@ -356,7 +356,7 @@ void *stream_thread(void *ptr) { // periodically check the entire list of nodes // this detects unresponsive parents too (timeout) - stream_sender_check_all_nodes_from_poll(sth); + stream_sender_check_all_nodes_from_poll(sth, now_ut); worker_set_metric(WORKER_SENDER_JOB_MESSAGES, (NETDATA_DOUBLE)(sth->messages.processed)); worker_set_metric(WORKER_STREAM_METRIC_NODES, (NETDATA_DOUBLE)sth->nodes_count); @@ -390,12 +390,11 @@ void *stream_thread(void *ptr) { continue; } - time_t now_s = now_monotonic_sec(); - if(nd_thread_signaled_to_cancel() || !service_running(SERVICE_STREAMING)) break; - exit_thread = stream_thread_process_poll_slot(sth, &ev, now_s, &replay_entries); + now_ut = now_monotonic_usec(); + exit_thread = stream_thread_process_poll_slot(sth, &ev, now_ut, &replay_entries); } // dequeue diff --git a/src/streaming/stream-thread.h b/src/streaming/stream-thread.h index 8f6c8a9dd9..511a31fe98 100644 --- a/src/streaming/stream-thread.h +++ b/src/streaming/stream-thread.h @@ -173,13 +173,13 @@ extern struct stream_thread_globals stream_thread_globals; void stream_sender_move_queue_to_running_unsafe(struct stream_thread *sth); void stream_receiver_move_queue_to_running_unsafe(struct stream_thread *sth); -void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth); +void stream_sender_check_all_nodes_from_poll(struct stream_thread *sth, usec_t now_ut); void stream_receiver_add_to_queue(struct receiver_state *rpt); void stream_sender_add_to_connector_queue(struct rrdhost *host); -void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, time_t now_s); -void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events, time_t now_s); +void stream_sender_process_poll_events(struct stream_thread *sth, struct sender_state *s, nd_poll_event_t events, usec_t now_ut); +void stream_receive_process_poll_events(struct stream_thread *sth, struct receiver_state *rpt, nd_poll_event_t events, usec_t now_ut); void stream_sender_cleanup(struct stream_thread *sth); void stream_receiver_cleanup(struct stream_thread *sth);