0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 11:12:42 +00:00

replication fixes No 7 ()

* move global statistics workers to a separate thread; query statistics per query source; query statistics for ML, exporters, backfilling; reset replication point in time every 10 seconds, instead of every 1; fix compilation warnings; optimize the replication queries code; prevent long tail of replication requests (big sleeps); provide query statistics about replication ; optimize replication sender when most senders are full; optimize replication_request_get_first_available(); reset replication completion calculation;

* remove workers utilization from global statistics thread
This commit is contained in:
Costa Tsaousis 2022-11-28 12:22:38 +02:00 committed by GitHub
parent 1e9f2c7a2a
commit 53a13ab8e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 696 additions and 242 deletions

View file

@ -14,42 +14,66 @@
#define WORKER_JOB_MALLOC_TRACE 7
#if WORKER_UTILIZATION_MAX_JOB_TYPES < 8
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 8
#endif
bool global_statistics_enabled = true;
static struct global_statistics {
volatile uint16_t connected_clients;
uint16_t connected_clients;
volatile uint64_t web_requests;
volatile uint64_t web_usec;
volatile uint64_t web_usec_max;
volatile uint64_t bytes_received;
volatile uint64_t bytes_sent;
volatile uint64_t content_size;
volatile uint64_t compressed_content_size;
uint64_t web_requests;
uint64_t web_usec;
uint64_t web_usec_max;
uint64_t bytes_received;
uint64_t bytes_sent;
uint64_t content_size;
uint64_t compressed_content_size;
volatile uint64_t web_client_count;
uint64_t web_client_count;
volatile uint64_t rrdr_queries_made;
volatile uint64_t rrdr_db_points_read;
volatile uint64_t rrdr_result_points_generated;
uint64_t api_data_queries_made;
uint64_t api_data_db_points_read;
uint64_t api_data_result_points_generated;
volatile uint64_t sqlite3_queries_made;
volatile uint64_t sqlite3_queries_ok;
volatile uint64_t sqlite3_queries_failed;
volatile uint64_t sqlite3_queries_failed_busy;
volatile uint64_t sqlite3_queries_failed_locked;
volatile uint64_t sqlite3_rows;
volatile uint64_t sqlite3_metadata_cache_hit;
volatile uint64_t sqlite3_context_cache_hit;
volatile uint64_t sqlite3_metadata_cache_miss;
volatile uint64_t sqlite3_context_cache_miss;
volatile uint64_t sqlite3_metadata_cache_spill;
volatile uint64_t sqlite3_context_cache_spill;
volatile uint64_t sqlite3_metadata_cache_write;
volatile uint64_t sqlite3_context_cache_write;
uint64_t api_weights_queries_made;
uint64_t api_weights_db_points_read;
uint64_t api_weights_result_points_generated;
uint64_t api_badges_queries_made;
uint64_t api_badges_db_points_read;
uint64_t api_badges_result_points_generated;
uint64_t health_queries_made;
uint64_t health_db_points_read;
uint64_t health_result_points_generated;
uint64_t ml_queries_made;
uint64_t ml_db_points_read;
uint64_t ml_result_points_generated;
uint64_t exporters_queries_made;
uint64_t exporters_db_points_read;
uint64_t backfill_queries_made;
uint64_t backfill_db_points_read;
uint64_t db_points_stored_per_tier[RRD_STORAGE_TIERS];
uint64_t sqlite3_queries_made;
uint64_t sqlite3_queries_ok;
uint64_t sqlite3_queries_failed;
uint64_t sqlite3_queries_failed_busy;
uint64_t sqlite3_queries_failed_locked;
uint64_t sqlite3_rows;
uint64_t sqlite3_metadata_cache_hit;
uint64_t sqlite3_context_cache_hit;
uint64_t sqlite3_metadata_cache_miss;
uint64_t sqlite3_context_cache_miss;
uint64_t sqlite3_metadata_cache_spill;
uint64_t sqlite3_context_cache_spill;
uint64_t sqlite3_metadata_cache_write;
uint64_t sqlite3_context_cache_write;
} global_statistics = {
.connected_clients = 0,
@ -61,12 +85,34 @@ static struct global_statistics {
.compressed_content_size = 0,
.web_client_count = 1,
.rrdr_queries_made = 0,
.rrdr_db_points_read = 0,
.rrdr_result_points_generated = 0,
.api_data_queries_made = 0,
.api_data_db_points_read = 0,
.api_data_result_points_generated = 0,
};
void sqlite3_query_completed(bool success, bool busy, bool locked) {
void global_statistics_rrdset_done_chart_collection_completed(size_t *points_read_per_tier_array) {
for(size_t tier = 0; tier < storage_tiers ;tier++) {
__atomic_fetch_add(&global_statistics.db_points_stored_per_tier[tier], points_read_per_tier_array[tier], __ATOMIC_RELAXED);
points_read_per_tier_array[tier] = 0;
}
}
void global_statistics_ml_query_completed(size_t points_read) {
__atomic_fetch_add(&global_statistics.ml_queries_made, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.ml_db_points_read, points_read, __ATOMIC_RELAXED);
}
void global_statistics_exporters_query_completed(size_t points_read) {
__atomic_fetch_add(&global_statistics.exporters_queries_made, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.exporters_db_points_read, points_read, __ATOMIC_RELAXED);
}
void global_statistics_backfill_query_completed(size_t points_read) {
__atomic_fetch_add(&global_statistics.backfill_queries_made, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.backfill_db_points_read, points_read, __ATOMIC_RELAXED);
}
void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked) {
__atomic_fetch_add(&global_statistics.sqlite3_queries_made, 1, __ATOMIC_RELAXED);
if(success) {
@ -83,21 +129,54 @@ void sqlite3_query_completed(bool success, bool busy, bool locked) {
}
}
void sqlite3_row_completed(void) {
void global_statistics_sqlite3_row_completed(void) {
__atomic_fetch_add(&global_statistics.sqlite3_rows, 1, __ATOMIC_RELAXED);
}
void rrdr_query_completed(uint64_t db_points_read, uint64_t result_points_generated) {
__atomic_fetch_add(&global_statistics.rrdr_queries_made, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.rrdr_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.rrdr_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
void global_statistics_rrdr_query_completed(size_t queries, uint64_t db_points_read, uint64_t result_points_generated, QUERY_SOURCE query_source) {
switch(query_source) {
case QUERY_SOURCE_API_DATA:
__atomic_fetch_add(&global_statistics.api_data_queries_made, queries, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_data_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_data_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
break;
case QUERY_SOURCE_ML:
__atomic_fetch_add(&global_statistics.ml_queries_made, queries, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.ml_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.ml_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
break;
case QUERY_SOURCE_API_WEIGHTS:
__atomic_fetch_add(&global_statistics.api_weights_queries_made, queries, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_weights_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_weights_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
break;
case QUERY_SOURCE_API_BADGE:
__atomic_fetch_add(&global_statistics.api_badges_queries_made, queries, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_badges_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.api_badges_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
break;
case QUERY_SOURCE_HEALTH:
__atomic_fetch_add(&global_statistics.health_queries_made, queries, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.health_db_points_read, db_points_read, __ATOMIC_RELAXED);
__atomic_fetch_add(&global_statistics.health_result_points_generated, result_points_generated, __ATOMIC_RELAXED);
break;
default:
case QUERY_SOURCE_UNITTEST:
case QUERY_SOURCE_UNKNOWN:
break;
}
}
void finished_web_request_statistics(uint64_t dt,
uint64_t bytes_received,
uint64_t bytes_sent,
uint64_t content_size,
uint64_t compressed_content_size) {
void global_statistics_web_request_completed(uint64_t dt,
uint64_t bytes_received,
uint64_t bytes_sent,
uint64_t content_size,
uint64_t compressed_content_size) {
uint64_t old_web_usec_max = global_statistics.web_usec_max;
while(dt > old_web_usec_max)
__atomic_compare_exchange(&global_statistics.web_usec_max, &old_web_usec_max, &dt, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
@ -110,16 +189,15 @@ void finished_web_request_statistics(uint64_t dt,
__atomic_fetch_add(&global_statistics.compressed_content_size, compressed_content_size, __ATOMIC_RELAXED);
}
uint64_t web_client_connected(void) {
uint64_t global_statistics_web_client_connected(void) {
__atomic_fetch_add(&global_statistics.connected_clients, 1, __ATOMIC_RELAXED);
return __atomic_fetch_add(&global_statistics.web_client_count, 1, __ATOMIC_RELAXED);
}
void web_client_disconnected(void) {
void global_statistics_web_client_disconnected(void) {
__atomic_fetch_sub(&global_statistics.connected_clients, 1, __ATOMIC_RELAXED);
}
static inline void global_statistics_copy(struct global_statistics *gs, uint8_t options) {
gs->connected_clients = __atomic_load_n(&global_statistics.connected_clients, __ATOMIC_RELAXED);
gs->web_requests = __atomic_load_n(&global_statistics.web_requests, __ATOMIC_RELAXED);
@ -131,9 +209,33 @@ static inline void global_statistics_copy(struct global_statistics *gs, uint8_t
gs->compressed_content_size = __atomic_load_n(&global_statistics.compressed_content_size, __ATOMIC_RELAXED);
gs->web_client_count = __atomic_load_n(&global_statistics.web_client_count, __ATOMIC_RELAXED);
gs->rrdr_queries_made = __atomic_load_n(&global_statistics.rrdr_queries_made, __ATOMIC_RELAXED);
gs->rrdr_db_points_read = __atomic_load_n(&global_statistics.rrdr_db_points_read, __ATOMIC_RELAXED);
gs->rrdr_result_points_generated = __atomic_load_n(&global_statistics.rrdr_result_points_generated, __ATOMIC_RELAXED);
gs->api_data_queries_made = __atomic_load_n(&global_statistics.api_data_queries_made, __ATOMIC_RELAXED);
gs->api_data_db_points_read = __atomic_load_n(&global_statistics.api_data_db_points_read, __ATOMIC_RELAXED);
gs->api_data_result_points_generated = __atomic_load_n(&global_statistics.api_data_result_points_generated, __ATOMIC_RELAXED);
gs->api_weights_queries_made = __atomic_load_n(&global_statistics.api_weights_queries_made, __ATOMIC_RELAXED);
gs->api_weights_db_points_read = __atomic_load_n(&global_statistics.api_weights_db_points_read, __ATOMIC_RELAXED);
gs->api_weights_result_points_generated = __atomic_load_n(&global_statistics.api_weights_result_points_generated, __ATOMIC_RELAXED);
gs->api_badges_queries_made = __atomic_load_n(&global_statistics.api_badges_queries_made, __ATOMIC_RELAXED);
gs->api_badges_db_points_read = __atomic_load_n(&global_statistics.api_badges_db_points_read, __ATOMIC_RELAXED);
gs->api_badges_result_points_generated = __atomic_load_n(&global_statistics.api_badges_result_points_generated, __ATOMIC_RELAXED);
gs->health_queries_made = __atomic_load_n(&global_statistics.health_queries_made, __ATOMIC_RELAXED);
gs->health_db_points_read = __atomic_load_n(&global_statistics.health_db_points_read, __ATOMIC_RELAXED);
gs->health_result_points_generated = __atomic_load_n(&global_statistics.health_result_points_generated, __ATOMIC_RELAXED);
gs->ml_queries_made = __atomic_load_n(&global_statistics.ml_queries_made, __ATOMIC_RELAXED);
gs->ml_db_points_read = __atomic_load_n(&global_statistics.ml_db_points_read, __ATOMIC_RELAXED);
gs->ml_result_points_generated = __atomic_load_n(&global_statistics.ml_result_points_generated, __ATOMIC_RELAXED);
gs->exporters_queries_made = __atomic_load_n(&global_statistics.exporters_queries_made, __ATOMIC_RELAXED);
gs->exporters_db_points_read = __atomic_load_n(&global_statistics.exporters_db_points_read, __ATOMIC_RELAXED);
gs->backfill_queries_made = __atomic_load_n(&global_statistics.backfill_queries_made, __ATOMIC_RELAXED);
gs->backfill_db_points_read = __atomic_load_n(&global_statistics.backfill_db_points_read, __ATOMIC_RELAXED);
for(size_t tier = 0; tier < storage_tiers ;tier++)
gs->db_points_stored_per_tier[tier] = __atomic_load_n(&global_statistics.db_points_stored_per_tier[tier], __ATOMIC_RELAXED);
if(options & GLOBAL_STATS_RESET_WEB_USEC_MAX) {
uint64_t n = 0;
@ -177,6 +279,7 @@ static void global_statistics_charts(void) {
struct global_statistics gs;
struct rusage me;
struct replication_query_statistics replication = replication_get_query_statistics();
global_statistics_copy(&gs, GLOBAL_STATS_RESET_WEB_USEC_MAX);
getrusage(RUSAGE_SELF, &me);
@ -425,65 +528,184 @@ static void global_statistics_charts(void) {
// ----------------------------------------------------------------
if(gs.rrdr_queries_made) {
static RRDSET *st_rrdr_queries = NULL;
static RRDDIM *rd_queries = NULL;
{
static RRDSET *st_queries = NULL;
static RRDDIM *rd_api_data_queries = NULL;
static RRDDIM *rd_api_weights_queries = NULL;
static RRDDIM *rd_api_badges_queries = NULL;
static RRDDIM *rd_health_queries = NULL;
static RRDDIM *rd_ml_queries = NULL;
static RRDDIM *rd_exporters_queries = NULL;
static RRDDIM *rd_backfill_queries = NULL;
static RRDDIM *rd_replication_queries = NULL;
if (unlikely(!st_rrdr_queries)) {
st_rrdr_queries = rrdset_create_localhost(
if (unlikely(!st_queries)) {
st_queries = rrdset_create_localhost(
"netdata"
, "queries"
, NULL
, "queries"
, NULL
, "Netdata API Queries"
, "Netdata DB Queries"
, "queries/s"
, "netdata"
, "stats"
, 131000
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
, RRDSET_TYPE_STACKED
);
rd_queries = rrddim_add(st_rrdr_queries, "queries", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_data_queries = rrddim_add(st_queries, "/api/v1/data", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_weights_queries = rrddim_add(st_queries, "/api/v1/weights", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_badges_queries = rrddim_add(st_queries, "/api/v1/badge", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_health_queries = rrddim_add(st_queries, "health", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_ml_queries = rrddim_add(st_queries, "ml", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_exporters_queries = rrddim_add(st_queries, "exporters", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_backfill_queries = rrddim_add(st_queries, "backfill", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_replication_queries = rrddim_add(st_queries, "replication", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
rrddim_set_by_pointer(st_rrdr_queries, rd_queries, (collected_number)gs.rrdr_queries_made);
rrddim_set_by_pointer(st_queries, rd_api_data_queries, (collected_number)gs.api_data_queries_made);
rrddim_set_by_pointer(st_queries, rd_api_weights_queries, (collected_number)gs.api_weights_queries_made);
rrddim_set_by_pointer(st_queries, rd_api_badges_queries, (collected_number)gs.api_badges_queries_made);
rrddim_set_by_pointer(st_queries, rd_health_queries, (collected_number)gs.health_queries_made);
rrddim_set_by_pointer(st_queries, rd_ml_queries, (collected_number)gs.ml_queries_made);
rrddim_set_by_pointer(st_queries, rd_exporters_queries, (collected_number)gs.exporters_queries_made);
rrddim_set_by_pointer(st_queries, rd_backfill_queries, (collected_number)gs.backfill_queries_made);
rrddim_set_by_pointer(st_queries, rd_replication_queries, (collected_number)replication.queries_finished);
rrdset_done(st_rrdr_queries);
rrdset_done(st_queries);
}
// ----------------------------------------------------------------
if(gs.rrdr_db_points_read || gs.rrdr_result_points_generated) {
static RRDSET *st_rrdr_points = NULL;
static RRDDIM *rd_points_read = NULL;
static RRDDIM *rd_points_generated = NULL;
{
static RRDSET *st_points_read = NULL;
static RRDDIM *rd_api_data_points_read = NULL;
static RRDDIM *rd_api_weights_points_read = NULL;
static RRDDIM *rd_api_badges_points_read = NULL;
static RRDDIM *rd_health_points_read = NULL;
static RRDDIM *rd_ml_points_read = NULL;
static RRDDIM *rd_exporters_points_read = NULL;
static RRDDIM *rd_backfill_points_read = NULL;
static RRDDIM *rd_replication_points_read = NULL;
if (unlikely(!st_rrdr_points)) {
st_rrdr_points = rrdset_create_localhost(
if (unlikely(!st_points_read)) {
st_points_read = rrdset_create_localhost(
"netdata"
, "db_points"
, "db_points_read"
, NULL
, "queries"
, NULL
, "Netdata API Points"
, "Netdata DB Points Query Read"
, "points/s"
, "netdata"
, "stats"
, 131001
, localhost->rrd_update_every
, RRDSET_TYPE_AREA
, RRDSET_TYPE_STACKED
);
rd_points_read = rrddim_add(st_rrdr_points, "read", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_points_generated = rrddim_add(st_rrdr_points, "generated", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_data_points_read = rrddim_add(st_points_read, "/api/v1/data", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_weights_points_read = rrddim_add(st_points_read, "/api/v1/weights", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_badges_points_read = rrddim_add(st_points_read, "/api/v1/badge", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_health_points_read = rrddim_add(st_points_read, "health", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_ml_points_read = rrddim_add(st_points_read, "ml", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_exporters_points_read = rrddim_add(st_points_read, "exporters", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_backfill_points_read = rrddim_add(st_points_read, "backfill", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_replication_points_read = rrddim_add(st_points_read, "replication", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
rrddim_set_by_pointer(st_rrdr_points, rd_points_read, (collected_number)gs.rrdr_db_points_read);
rrddim_set_by_pointer(st_rrdr_points, rd_points_generated, (collected_number)gs.rrdr_result_points_generated);
rrddim_set_by_pointer(st_points_read, rd_api_data_points_read, (collected_number)gs.api_data_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_api_weights_points_read, (collected_number)gs.api_weights_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_api_badges_points_read, (collected_number)gs.api_badges_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_health_points_read, (collected_number)gs.health_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_ml_points_read, (collected_number)gs.ml_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_exporters_points_read, (collected_number)gs.exporters_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_backfill_points_read, (collected_number)gs.backfill_db_points_read);
rrddim_set_by_pointer(st_points_read, rd_replication_points_read, (collected_number)replication.points_read);
rrdset_done(st_rrdr_points);
rrdset_done(st_points_read);
}
// ----------------------------------------------------------------
if(gs.api_data_result_points_generated || replication.points_generated) {
static RRDSET *st_points_generated = NULL;
static RRDDIM *rd_api_data_points_generated = NULL;
static RRDDIM *rd_api_weights_points_generated = NULL;
static RRDDIM *rd_api_badges_points_generated = NULL;
static RRDDIM *rd_health_points_generated = NULL;
static RRDDIM *rd_ml_points_generated = NULL;
static RRDDIM *rd_replication_points_generated = NULL;
if (unlikely(!st_points_generated)) {
st_points_generated = rrdset_create_localhost(
"netdata"
, "db_points_results"
, NULL
, "queries"
, NULL
, "Netdata Points in Query Results"
, "points/s"
, "netdata"
, "stats"
, 131002
, localhost->rrd_update_every
, RRDSET_TYPE_STACKED
);
rd_api_data_points_generated = rrddim_add(st_points_generated, "/api/v1/data", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_weights_points_generated = rrddim_add(st_points_generated, "/api/v1/weights", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_api_badges_points_generated = rrddim_add(st_points_generated, "/api/v1/badge", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_health_points_generated = rrddim_add(st_points_generated, "health", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_ml_points_generated = rrddim_add(st_points_generated, "ml", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_replication_points_generated = rrddim_add(st_points_generated, "replication", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
rrddim_set_by_pointer(st_points_generated, rd_api_data_points_generated, (collected_number)gs.api_data_result_points_generated);
rrddim_set_by_pointer(st_points_generated, rd_api_weights_points_generated, (collected_number)gs.api_weights_result_points_generated);
rrddim_set_by_pointer(st_points_generated, rd_api_badges_points_generated, (collected_number)gs.api_badges_result_points_generated);
rrddim_set_by_pointer(st_points_generated, rd_health_points_generated, (collected_number)gs.health_result_points_generated);
rrddim_set_by_pointer(st_points_generated, rd_ml_points_generated, (collected_number)gs.ml_result_points_generated);
rrddim_set_by_pointer(st_points_generated, rd_replication_points_generated, (collected_number)replication.points_generated);
rrdset_done(st_points_generated);
}
// ----------------------------------------------------------------
{
static RRDSET *st_points_stored = NULL;
static RRDDIM *rds[RRD_STORAGE_TIERS] = {};
if (unlikely(!st_points_stored)) {
st_points_stored = rrdset_create_localhost(
"netdata"
, "db_points_stored"
, NULL
, "queries"
, NULL
, "Netdata DB Points Stored"
, "points/s"
, "netdata"
, "stats"
, 131003
, localhost->rrd_update_every
, RRDSET_TYPE_STACKED
);
for(size_t tier = 0; tier < storage_tiers ;tier++) {
char buf[30 + 1];
snprintfz(buf, 30, "tier%zu", tier);
rds[tier] = rrddim_add(st_points_stored, buf, NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
}
for(size_t tier = 0; tier < storage_tiers ;tier++)
rrddim_set_by_pointer(st_points_stored, rds[tier], (collected_number)gs.db_points_stored_per_tier[tier]);
rrdset_done(st_points_stored);
}
// ----------------------------------------------------------------
@ -2440,6 +2662,19 @@ static void worker_utilization_finish(void) {
}
// ---------------------------------------------------------------------------------------------------------------------
// global statistics thread
static void global_statistics_register_workers(void) {
worker_register("STATS");
worker_register_job_name(WORKER_JOB_GLOBAL, "global");
worker_register_job_name(WORKER_JOB_REGISTRY, "registry");
worker_register_job_name(WORKER_JOB_WORKERS, "workers");
worker_register_job_name(WORKER_JOB_DBENGINE, "dbengine");
worker_register_job_name(WORKER_JOB_STRINGS, "strings");
worker_register_job_name(WORKER_JOB_DICTIONARIES, "dictionaries");
worker_register_job_name(WORKER_JOB_MALLOC_TRACE, "malloc_trace");
}
static void global_statistics_cleanup(void *ptr)
{
@ -2457,14 +2692,7 @@ static void global_statistics_cleanup(void *ptr)
void *global_statistics_main(void *ptr)
{
worker_register("STATS");
worker_register_job_name(WORKER_JOB_GLOBAL, "global");
worker_register_job_name(WORKER_JOB_REGISTRY, "registry");
worker_register_job_name(WORKER_JOB_WORKERS, "workers");
worker_register_job_name(WORKER_JOB_DBENGINE, "dbengine");
worker_register_job_name(WORKER_JOB_STRINGS, "strings");
worker_register_job_name(WORKER_JOB_DICTIONARIES, "dictionaries");
worker_register_job_name(WORKER_JOB_MALLOC_TRACE, "malloc_trace");
global_statistics_register_workers();
netdata_thread_cleanup_push(global_statistics_cleanup, ptr);
@ -2485,9 +2713,6 @@ void *global_statistics_main(void *ptr)
worker_is_idle();
heartbeat_next(&hb, step);
worker_is_busy(WORKER_JOB_WORKERS);
worker_utilization_charts();
worker_is_busy(WORKER_JOB_GLOBAL);
global_statistics_charts();
@ -2517,3 +2742,49 @@ void *global_statistics_main(void *ptr)
netdata_thread_cleanup_pop(1);
return NULL;
}
// ---------------------------------------------------------------------------------------------------------------------
// workers thread
static void global_statistics_workers_cleanup(void *ptr)
{
worker_unregister();
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
worker_utilization_finish();
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
void *global_statistics_workers_main(void *ptr)
{
global_statistics_register_workers();
netdata_thread_cleanup_push(global_statistics_workers_cleanup, ptr);
int update_every =
(int)config_get_number(CONFIG_SECTION_GLOBAL_STATISTICS, "update every", localhost->rrd_update_every);
if (update_every < localhost->rrd_update_every)
update_every = localhost->rrd_update_every;
usec_t step = update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
while (!netdata_exit) {
worker_is_idle();
heartbeat_next(&hb, step);
worker_is_busy(WORKER_JOB_WORKERS);
worker_utilization_charts();
}
netdata_thread_cleanup_pop(1);
return NULL;
}

View file

@ -3,23 +3,27 @@
#ifndef NETDATA_GLOBAL_STATISTICS_H
#define NETDATA_GLOBAL_STATISTICS_H 1
#include "common.h"
#include "database/rrd.h"
// ----------------------------------------------------------------------------
// global statistics
void rrdr_query_completed(uint64_t db_points_read, uint64_t result_points_generated);
void sqlite3_query_completed(bool success, bool busy, bool locked);
void sqlite3_row_completed(void);
void global_statistics_ml_query_completed(size_t points_read);
void global_statistics_exporters_query_completed(size_t points_read);
void global_statistics_backfill_query_completed(size_t points_read);
void global_statistics_rrdr_query_completed(size_t queries, uint64_t db_points_read, uint64_t result_points_generated, QUERY_SOURCE query_source);
void global_statistics_sqlite3_query_completed(bool success, bool busy, bool locked);
void global_statistics_sqlite3_row_completed(void);
void global_statistics_rrdset_done_chart_collection_completed(size_t *points_read_per_tier_array);
void finished_web_request_statistics(uint64_t dt,
uint64_t bytes_received,
uint64_t bytes_sent,
uint64_t content_size,
uint64_t compressed_content_size);
void global_statistics_web_request_completed(uint64_t dt,
uint64_t bytes_received,
uint64_t bytes_sent,
uint64_t content_size,
uint64_t compressed_content_size);
uint64_t web_client_connected(void);
void web_client_disconnected(void);
uint64_t global_statistics_web_client_connected(void);
void global_statistics_web_client_disconnected(void);
extern bool global_statistics_enabled;

View file

@ -6,6 +6,7 @@ void *aclk_main(void *ptr);
void *analytics_main(void *ptr);
void *cpuidlejitter_main(void *ptr);
void *global_statistics_main(void *ptr);
void *global_statistics_workers_main(void *ptr);
void *health_main(void *ptr);
void *pluginsd_main(void *ptr);
void *service_main(void *ptr);
@ -54,6 +55,17 @@ const struct netdata_static_thread static_threads_common[] = {
.init_routine = NULL,
.start_routine = global_statistics_main
},
{
.name = "WORKERS_STATS",
.config_section = CONFIG_SECTION_PLUGINS,
.config_name = "netdata monitoring",
.env_name = "NETDATA_INTERNALS_MONITORING",
.global_variable = &global_statistics_enabled,
.enabled = 1,
.thread = NULL,
.init_routine = NULL,
.start_routine = global_statistics_workers_main
},
{
.name = "PLUGINSD",
.config_section = NULL,

View file

@ -1938,7 +1938,7 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]
ONEWAYALLOC *owa = onewayalloc_create(0);
RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start, time_end,
RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS,
NULL, NULL, 0, 0);
NULL, NULL, 0, 0, QUERY_SOURCE_UNITTEST);
if (!r) {
fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", rrdset_name(st[i]), current_region);
return ++errors;
@ -2076,7 +2076,7 @@ int test_dbengine(void)
ONEWAYALLOC *owa = onewayalloc_create(0);
RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start[0] + update_every,
time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0,
RRDR_OPTION_NATURAL_POINTS, NULL, NULL, 0, 0);
RRDR_OPTION_NATURAL_POINTS, NULL, NULL, 0, 0, QUERY_SOURCE_UNITTEST);
if (!r) {
fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", rrdset_name(st[i]));

View file

@ -33,6 +33,16 @@ typedef struct rrddim_acquired RRDDIM_ACQUIRED;
typedef void *ml_host_t;
typedef void *ml_dimension_t;
typedef enum {
QUERY_SOURCE_UNKNOWN,
QUERY_SOURCE_API_DATA,
QUERY_SOURCE_API_BADGE,
QUERY_SOURCE_API_WEIGHTS,
QUERY_SOURCE_HEALTH,
QUERY_SOURCE_ML,
QUERY_SOURCE_UNITTEST,
} QUERY_SOURCE;
// forward declarations
struct rrddim_tier;

View file

@ -171,6 +171,7 @@ typedef struct query_target_request {
const char *group_options;
time_t resampling_time;
size_t tier;
QUERY_SOURCE query_source;
} QUERY_TARGET_REQUEST;
typedef struct query_target {

View file

@ -1046,12 +1046,14 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) {
return last_updated_ut;
}
static __thread size_t rrdset_done_statistics_points_stored_per_tier[RRD_STORAGE_TIERS];
static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) {
time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping;
return now + loop - ((now + loop) % loop);
}
void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut __maybe_unused) {
void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut __maybe_unused) {
if (unlikely(!t->next_point_time))
t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
@ -1079,6 +1081,7 @@ void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, u
0, SN_FLAG_NONE);
}
rrdset_done_statistics_points_stored_per_tier[tier]++;
t->virtual_point.count = 0; // make the point unset
t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
}
@ -1141,6 +1144,7 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
// store the metric on tier 0
rd->tiers[0]->collect_ops->store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
rrdset_done_statistics_points_stored_per_tier[0]++;
time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC);
@ -1167,10 +1171,14 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
rrddim_option_set(rd, RRDDIM_OPTION_BACKFILLED_HIGH_TIERS);
}
store_metric_at_tier(rd, t, sp, point_end_time_ut);
store_metric_at_tier(rd, tier, t, sp, point_end_time_ut);
}
}
void store_metric_collection_completed() {
global_statistics_rrdset_done_chart_collection_completed(rrdset_done_statistics_points_stored_per_tier);
}
// caching of dimensions rrdset_done() and rrdset_done_interpolate() loop through
struct rda_item {
const DICTIONARY_ITEM *item;
@ -1934,6 +1942,8 @@ after_second_database_work:
rrdcontext_collected_rrdset(st);
netdata_thread_enable_cancelability();
store_metric_collection_completed();
}
time_t rrdset_set_update_every(RRDSET *st, time_t update_every) {

View file

@ -71,7 +71,7 @@ SQLITE_API int sqlite3_exec_monitored(
char **errmsg /* Error msg written here */
) {
int rc = sqlite3_exec(db, sql, callback, data, errmsg);
sqlite3_query_completed(rc == SQLITE_OK, rc == SQLITE_BUSY, rc == SQLITE_LOCKED);
global_statistics_sqlite3_query_completed(rc == SQLITE_OK, rc == SQLITE_BUSY, rc == SQLITE_LOCKED);
return rc;
}
@ -83,14 +83,14 @@ SQLITE_API int sqlite3_step_monitored(sqlite3_stmt *stmt) {
rc = sqlite3_step(stmt);
switch (rc) {
case SQLITE_DONE:
sqlite3_query_completed(1, 0, 0);
global_statistics_sqlite3_query_completed(1, 0, 0);
break;
case SQLITE_ROW:
sqlite3_row_completed();
global_statistics_sqlite3_row_completed();
break;
case SQLITE_BUSY:
case SQLITE_LOCKED:
sqlite3_query_completed(rc == SQLITE_DONE, rc == SQLITE_BUSY, rc == SQLITE_LOCKED);
global_statistics_sqlite3_query_completed(rc == SQLITE_DONE, rc == SQLITE_BUSY, rc == SQLITE_LOCKED);
usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
continue;
default:

View file

@ -122,11 +122,13 @@ NETDATA_DOUBLE exporting_calculate_value_from_stored_data(
*last_timestamp = before;
size_t points_read = 0;
size_t counter = 0;
NETDATA_DOUBLE sum = 0;
for (rd->tiers[0]->query_ops->init(rd->tiers[0]->db_metric_handle, &handle, after, before); !rd->tiers[0]->query_ops->is_finished(&handle);) {
STORAGE_POINT sp = rd->tiers[0]->query_ops->next_metric(&handle);
points_read++;
if (unlikely(storage_point_is_empty(sp))) {
// not collected
@ -137,6 +139,7 @@ NETDATA_DOUBLE exporting_calculate_value_from_stored_data(
counter += sp.count;
}
rd->tiers[0]->query_ops->finalize(&handle);
global_statistics_exporters_query_completed(points_read);
if (unlikely(!counter)) {
debug(

View file

@ -1039,7 +1039,8 @@ void *health_main(void *ptr) {
0, rc->options,
&rc->db_after,&rc->db_before,
NULL, NULL, NULL,
&value_is_null, NULL, 0, 0);
&value_is_null, NULL, 0, 0,
QUERY_SOURCE_HEALTH);
if (unlikely(ret != 200)) {
// database lookup failed

View file

@ -341,20 +341,21 @@ usec_t heartbeat_next(heartbeat_t *hb, usec_t tick) {
void sleep_usec(usec_t usec) {
// we expect microseconds (1.000.000 per second)
// but timespec is nanoseconds (1.000.000.000 per second)
struct timespec rem, req = {
struct timespec rem = { 0, 0 }, req = {
.tv_sec = (time_t) (usec / USEC_PER_SEC),
.tv_nsec = (suseconds_t) ((usec % USEC_PER_SEC) * NSEC_PER_USEC)
};
#ifdef __linux__
while ((errno = clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem)) != 0) {
while (clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem) != 0) {
#else
while ((errno = nanosleep(&req, &rem)) != 0) {
while (nanosleep(&req, &rem) != 0) {
#endif
if (likely(errno == EINTR)) {
req.tv_sec = rem.tv_sec;
req.tv_nsec = rem.tv_nsec;
} else {
if (likely(errno == EINTR && (rem.tv_sec || rem.tv_nsec))) {
req = rem;
rem = (struct timespec){ 0, 0 };
}
else {
#ifdef __linux__
error("Cannot clock_nanosleep(CLOCK_REALTIME) for %llu microseconds.", usec);
#else

View file

@ -199,7 +199,10 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
#define item_is_not_referenced_and_can_be_removed(dict, item) (item_is_not_referenced_and_can_be_removed_advanced(dict, item) == RC_ITEM_OK)
static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item);
static inline void pointer_index_init(DICTIONARY *dict) {
// ----------------------------------------------------------------------------
// validate each pointer is indexed once - internal checks only
static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_init(&dict->global_pointer_registry_mutex);
#else
@ -207,7 +210,7 @@ static inline void pointer_index_init(DICTIONARY *dict) {
#endif
}
static inline void pointer_destroy_index(DICTIONARY *dict) {
static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
JudyHSFreeArray(&dict->global_pointer_registry, PJE0);
@ -216,7 +219,7 @@ static inline void pointer_destroy_index(DICTIONARY *dict) {
;
#endif
}
static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
@ -229,7 +232,7 @@ static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM
#endif
}
static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *));
@ -241,7 +244,7 @@ static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITE
#endif
}
static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
@ -413,7 +416,7 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.deletes, 1, __ATOMIC_RELAXED);
__atomic_fetch_sub(&dict->stats->items.entries, 1, __ATOMIC_RELAXED);
size_t entries;
size_t entries; (void)entries;
if(unlikely(is_dictionary_single_threaded(dict))) {
dict->version++;
entries = dict->entries++;

View file

@ -278,6 +278,36 @@ int __netdata_rwlock_trywrlock(netdata_rwlock_t *rwlock) {
return ret;
}
// ----------------------------------------------------------------------------
// spinlock implementation
// https://www.youtube.com/watch?v=rmGJc9PXpuE&t=41s
void netdata_spinlock_init(SPINLOCK *spinlock) {
*spinlock = NETDATA_SPINLOCK_INITIALIZER;
}
void netdata_spinlock_lock(SPINLOCK *spinlock) {
static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
bool expected = false, desired = true;
for(int i = 1;
__atomic_load_n(&spinlock->locked, __ATOMIC_RELAXED) ||
!__atomic_compare_exchange_n(&spinlock->locked, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)
; i++
) {
if(unlikely(i == 8)) {
i = 0;
nanosleep(&ns, NULL);
}
}
// we have the lock
}
void netdata_spinlock_unlock(SPINLOCK *spinlock) {
__atomic_store_n(&spinlock->locked, false, __ATOMIC_RELEASE);
}
#ifdef NETDATA_TRACE_RWLOCKS
// ----------------------------------------------------------------------------

View file

@ -9,6 +9,14 @@
typedef pthread_mutex_t netdata_mutex_t;
#define NETDATA_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
typedef struct netdata_spinlock {
bool locked;
} SPINLOCK;
#define NETDATA_SPINLOCK_INITIALIZER (SPINLOCK){ .locked = false }
void netdata_spinlock_init(SPINLOCK *spinlock);
void netdata_spinlock_lock(SPINLOCK *spinlock);
void netdata_spinlock_unlock(SPINLOCK *spinlock);
#ifdef NETDATA_TRACE_RWLOCKS
typedef struct netdata_rwlock_locker {
pid_t pid;

View file

@ -140,7 +140,8 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR
Options, "anomaly_rate",
NULL /* group options */,
0, /* timeout */
0 /* tier */
0, /* tier */
QUERY_SOURCE_ML
);
if(R) {
assert(R->d == 1 && R->n == 1 && R->rows == 1);

View file

@ -22,6 +22,7 @@ public:
void init(time_t AfterT, time_t BeforeT) {
Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT);
Initialized = true;
points_read = 0;
}
bool isFinished() {
@ -29,11 +30,15 @@ public:
}
~Query() {
if (Initialized)
if (Initialized) {
Ops->finalize(&Handle);
global_statistics_ml_query_completed(points_read);
points_read = 0;
}
}
std::pair<time_t, CalculatedNumber> nextMetric() {
points_read++;
STORAGE_POINT sp = Ops->next_metric(&Handle);
return { sp.start_time, sp.sum / sp.count };
}
@ -41,6 +46,7 @@ public:
private:
RRDDIM *RD;
bool Initialized;
size_t points_read;
struct storage_engine_query_ops *Ops;
struct storage_engine_query_handle Handle;

View file

@ -6,23 +6,36 @@
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
static struct replication_query_statistics replication_queries = {
.queries_started = 0,
.queries_finished = 0,
.points_read = 0,
.points_generated = 0,
};
struct replication_query_statistics replication_get_query_statistics(void) {
return replication_queries;
}
// ----------------------------------------------------------------------------
// sending replication replies
struct replication_dimension {
STORAGE_POINT sp;
struct storage_engine_query_handle handle;
bool enabled;
DICTIONARY *dict;
const DICTIONARY_ITEM *rda;
RRDDIM *rd;
};
static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
size_t dimensions = rrdset_number_of_dimensions(st);
size_t points_read = 0, points_generated = 0;
struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
struct {
DICTIONARY *dict;
const DICTIONARY_ITEM *rda;
RRDDIM *rd;
struct storage_engine_query_handle handle;
STORAGE_POINT sp;
bool enabled;
} data[dimensions];
struct replication_dimension data[dimensions];
memset(data, 0, sizeof(data));
if(enable_streaming && st->last_updated.tv_sec > before) {
@ -38,23 +51,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
{
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if (rd_dfe.counter >= dimensions) {
if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
continue;
if (unlikely(rd_dfe.counter >= dimensions)) {
internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
rrdhost_hostname(st->rrdhost), rrdset_id(st));
break;
}
if(rd->exposed) {
data[rd_dfe.counter].dict = rd_dfe.dict;
data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
data[rd_dfe.counter].rd = rd;
struct replication_dimension *d = &data[rd_dfe.counter];
ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
d->dict = rd_dfe.dict;
d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
d->rd = rd;
data[rd_dfe.counter].enabled = true;
}
else
data[rd_dfe.counter].enabled = false;
ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
d->enabled = true;
}
rrddim_foreach_done(rd);
}
@ -62,32 +75,35 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
if(!data[i].enabled) continue;
for (size_t i = 0; i < dimensions ;i++) {
struct replication_dimension *d = &data[i];
if(unlikely(!d->enabled)) continue;
// fetch the first valid point for the dimension
int max_skip = 100;
while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
data[i].sp = ops->next_metric(&data[i].handle);
while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
d->sp = ops->next_metric(&d->handle);
points_read++;
}
internal_error(max_skip <= 0,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
if(data[i].sp.end_time < now)
if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
continue;
if(!min_start_time) {
min_start_time = data[i].sp.start_time;
min_end_time = data[i].sp.end_time;
if(unlikely(!min_start_time)) {
min_start_time = d->sp.start_time;
min_end_time = d->sp.end_time;
}
else {
min_start_time = MIN(min_start_time, data[i].sp.start_time);
min_end_time = MIN(min_end_time, data[i].sp.end_time);
min_start_time = MIN(min_start_time, d->sp.start_time);
min_end_time = MIN(min_end_time, d->sp.end_time);
}
}
if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
internal_error(true,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
rrdhost_hostname(st->rrdhost), rrdset_id(st),
@ -97,7 +113,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
break;
}
if(min_end_time < now) {
if(unlikely(min_end_time < now)) {
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
@ -106,10 +122,10 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
break;
}
if(min_end_time <= min_start_time)
if(unlikely(min_end_time <= min_start_time))
min_start_time = min_end_time - st->update_every;
if(!actual_after) {
if(unlikely(!actual_after)) {
actual_after = min_end_time;
actual_before = min_end_time;
}
@ -123,15 +139,19 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
);
// output the replay values for this time
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
if(!data[i].enabled) continue;
for (size_t i = 0; i < dimensions ;i++) {
struct replication_dimension *d = &data[i];
if(unlikely(!d->enabled)) continue;
if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
else
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
rrddim_id(data[i].rd));
rrddim_id(d->rd));
points_generated++;
}
now = min_end_time + 1;
@ -157,13 +177,22 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
// release all the dictionary items acquired
// finalize the queries
for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
if(!data[i].enabled) continue;
for(size_t i = 0; i < dimensions ;i++) {
struct replication_dimension *d = &data[i];
if(unlikely(!d->enabled)) continue;
ops->finalize(&data[i].handle);
dictionary_acquired_item_release(data[i].dict, data[i].rda);
ops->finalize(&d->handle);
dictionary_acquired_item_release(d->dict, d->rda);
// update global statistics
replication_queries.queries_started++;
replication_queries.queries_finished++;
}
replication_queries.points_read += points_read;
replication_queries.points_generated += points_generated;
return before;
}
@ -560,6 +589,8 @@ static struct replication_thread {
size_t sender_resets;
size_t waits;
size_t skipped_no_room_last_run;
Pvoid_t JudyL_array;
} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
@ -570,6 +601,7 @@ static struct replication_thread {
.first_time_t = 0,
.next_unique_id = 1,
.skipped_no_room = 0,
.skipped_no_room_last_run = 0,
.skipped_not_connected = 0,
.sender_resets = 0,
.waits = 0,
@ -599,6 +631,13 @@ static void replication_recursive_unlock() {
#endif
}
void replication_set_next_point_in_time(time_t after, size_t unique_id) {
replication_recursive_lock();
replication_globals.last_after = after;
replication_globals.last_unique_id = unique_id;
replication_recursive_unlock();
}
// ----------------------------------------------------------------------------
// replication sort entry management
@ -626,10 +665,9 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
struct replication_sort_entry *rse = replication_sort_entry_create(rq);
if(rq->after < (time_t)replication_globals.last_after) {
if(rq->after < (time_t)replication_globals.last_after && rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !replication_globals.skipped_no_room_last_run) {
// make it find this request first
replication_globals.last_after = rq->after;
replication_globals.last_unique_id = rq->unique_id;
replication_set_next_point_in_time(rq->after, rq->unique_id);
}
replication_globals.added++;
@ -716,8 +754,9 @@ static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
replication_recursive_lock();
replication_globals.skipped_no_room_last_run = 0;
struct replication_request rq = (struct replication_request){ .found = false };
struct replication_request rq_to_return = (struct replication_request){ .found = false };
if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
@ -726,41 +765,50 @@ static struct replication_request replication_request_get_first_available() {
}
bool find_same_after = true;
while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
while(!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
Pvoid_t *our_item_pptr;
while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
while(!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
struct replication_sort_entry *rse = *our_item_pptr;
struct sender_state *s = rse->rq->sender;
struct replication_request *rq = rse->rq;
struct sender_state *s = rq->sender;
bool sender_is_connected =
rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
if(likely(s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
// there is room for this request in the sender buffer
bool sender_has_been_flushed_since_this_request =
rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
bool sender_is_connected =
rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
bool sender_has_room_to_spare =
s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
bool sender_has_been_flushed_since_this_request =
rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
replication_globals.skipped_not_connected++;
if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
break;
if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
// skip this request, the sender is not connected or it has reconnected
replication_globals.skipped_not_connected++;
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
// we removed the item from the outer JudyL
break;
}
else {
// this request is good to execute
// copy the request to return it
rq_to_return = *rq;
rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
// set the return result to found
rq_to_return.found = true;
if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
// we removed the item from the outer JudyL
break;
}
}
else if(sender_has_room_to_spare) {
// copy the request to return it
rq = *rse->rq;
rq.chart_id = string_dup(rq.chart_id);
// set the return result to found
rq.found = true;
if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
break;
}
else
else {
replication_globals.skipped_no_room++;
replication_globals.skipped_no_room_last_run++;
}
}
// call JudyLNext from now on
@ -771,7 +819,7 @@ static struct replication_request replication_request_get_first_available() {
}
replication_recursive_unlock();
return rq;
return rq_to_return;
}
// ----------------------------------------------------------------------------
@ -890,8 +938,7 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
s->replication_reached_max = false;
replication_recursive_lock();
replication_globals.last_after = 0;
replication_globals.last_unique_id = 0;
replication_set_next_point_in_time(0, 0);
replication_globals.sender_resets++;
replication_recursive_unlock();
}
@ -929,17 +976,18 @@ static void replication_main_cleanup(void *ptr) {
#define WORKER_JOB_CHECK_CONSISTENCY 15
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
#define SECONDS_TO_RESET_POINT_IN_TIME 10
static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
if(host->sender) {
size_t pending_requests = host->sender->replication_pending_requests;
size_t dict_entries = dictionary_entries(host->sender->replication_requests);
internal_error(
!pending_requests && dict_entries,
"REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
rrdhost_hostname(host), pending_requests, dict_entries);
}
internal_error(
host->sender &&
!host->sender->replication_pending_requests &&
dictionary_entries(host->sender->replication_requests) != 0,
"REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
rrdhost_hostname(host),
host->sender->replication_pending_requests,
dictionary_entries(host->sender->replication_requests)
);
size_t ok = 0;
size_t errors = 0;
@ -983,21 +1031,17 @@ static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
}
static void verify_all_hosts_charts_are_streaming_now(void) {
#ifdef NETDATA_INTERNAL_CHECKS
worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
size_t errors = 0;
RRDHOST *host;
dfe_start_reentrant(rrdhost_root_index, host)
dfe_start_read(rrdhost_root_index, host)
errors += verify_host_charts_are_streaming_now(host);
dfe_done(host);
size_t executed = replication_globals.executed;
internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
replication_globals.last_executed = executed;
#else
;
#endif
}
void *replication_thread_main(void *ptr __maybe_unused) {
@ -1027,7 +1071,9 @@ void *replication_thread_main(void *ptr __maybe_unused) {
time_t latest_first_time_t = 0;
long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
bool slow = true; // control the time we sleep - it has to start with true!
usec_t last_now_mono_ut = now_monotonic_usec();
time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
while(!netdata_exit) {
@ -1036,9 +1082,21 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
last_now_mono_ut = now_mono_ut;
if(replication_reset_next_point_in_time_countdown-- == 0) {
// once per second, make it scan all the pending requests next time
replication_set_next_point_in_time(0, 0);
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
}
if(!replication_globals.pending && run_verification_countdown-- == 0) {
replication_globals.first_time_t = 0; // reset the statistics about completion percentage
// reset the statistics about completion percentage
replication_globals.first_time_t = 0;
latest_first_time_t = 0;
verify_all_hosts_charts_are_streaming_now();
run_verification_countdown = LONG_MAX;
slow = true;
}
worker_is_busy(WORKER_JOB_STATISTICS);
@ -1068,17 +1126,36 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(unlikely(!rq.found)) {
// make it scan all the pending requests next time
replication_globals.last_after = 0;
replication_globals.last_unique_id = 0;
replication_set_next_point_in_time(0, 0);
replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
// the timeout also defines now frequently we will traverse all the pending requests
// when the outbound buffers of all senders is full
usec_t timeout;
if(slow)
// no work to be done, wait for a request to come in
timeout = 1000 * USEC_PER_MS;
else if(replication_globals.pending > 0)
// there are pending requests waiting to be executed,
// but none could be executed at this time.
// try again after this time.
timeout = 100 * USEC_PER_MS;
else
// no pending requests, but there were requests recently (run_verification_countdown)
// so, try in a short time.
// if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
timeout = 10 * USEC_PER_MS;
replication_globals.waits++;
worker_is_idle();
sleep_usec(((replication_globals.pending) ? 10 : 1000) * USEC_PER_MS);
sleep_usec(timeout);
continue;
}
run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
slow = false;
// delete the request from the dictionary
worker_is_busy(WORKER_JOB_DELETE_ENTRY);

View file

@ -5,6 +5,15 @@
#include "daemon/common.h"
struct replication_query_statistics {
size_t queries_started;
size_t queries_finished;
size_t points_read;
size_t points_generated;
};
struct replication_query_statistics replication_get_query_statistics(void);
bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
typedef int (*send_command)(const char *txt, void *data);

View file

@ -1116,7 +1116,8 @@ int web_client_api_request_v1_badge(RRDHOST *host, struct web_client *w, char *u
points, after, before, group, group_options, 0, options,
NULL, &latest_timestamp,
NULL, NULL, NULL,
&value_is_null, NULL, 0, 0);
&value_is_null, NULL, 0, 0,
QUERY_SOURCE_API_BADGE);
// if the value cannot be calculated, show empty badge
if (ret != HTTP_RESP_OK) {

View file

@ -76,6 +76,7 @@ int rrdset2value_api_v1(
, NETDATA_DOUBLE *anomaly_rate
, time_t timeout
, size_t tier
, QUERY_SOURCE query_source
) {
int ret = HTTP_RESP_INTERNAL_SERVER_ERROR;
@ -92,7 +93,8 @@ int rrdset2value_api_v1(
dimensions,
group_options,
timeout,
tier);
tier,
query_source);
if(!r) {
if(value_is_null) *value_is_null = 1;

View file

@ -78,6 +78,7 @@ int rrdset2value_api_v1(
, NETDATA_DOUBLE *anomaly_rate
, time_t timeout
, size_t tier
, QUERY_SOURCE query_source
);
#endif /* NETDATA_RRD2JSON_H */

View file

@ -106,7 +106,7 @@ QUERY_VALUE rrdmetric2value(RRDHOST *host,
struct rrdcontext_acquired *rca, struct rrdinstance_acquired *ria, struct rrdmetric_acquired *rma,
time_t after, time_t before,
RRDR_OPTIONS options, RRDR_GROUPING group_method, const char *group_options,
size_t tier, time_t timeout
size_t tier, time_t timeout, QUERY_SOURCE query_source
) {
QUERY_TARGET_REQUEST qtr = {
.host = host,
@ -121,6 +121,7 @@ QUERY_VALUE rrdmetric2value(RRDHOST *host,
.group_options = group_options,
.tier = tier,
.timeout = timeout,
.query_source = query_source,
};
ONEWAYALLOC *owa = onewayalloc_create(16 * 1024);

View file

@ -23,7 +23,7 @@ QUERY_VALUE rrdmetric2value(RRDHOST *host,
struct rrdcontext_acquired *rca, struct rrdinstance_acquired *ria, struct rrdmetric_acquired *rma,
time_t after, time_t before,
RRDR_OPTIONS options, RRDR_GROUPING group_method, const char *group_options,
size_t tier, time_t timeout
size_t tier, time_t timeout, QUERY_SOURCE query_source
);
NETDATA_DOUBLE rrdr2value(RRDR *r, long i, RRDR_OPTIONS options, int *all_values_are_null, NETDATA_DOUBLE *anomaly_rate);

View file

@ -1473,7 +1473,8 @@ static inline void rrd2rrdr_do_dimension(RRDR *r, size_t dim_id_in_rrdr) {
// ----------------------------------------------------------------------------
// fill the gap of a tier
extern void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut);
void store_metric_at_tier(RRDDIM *rd, size_t tier, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut);
void store_metric_collection_completed(void);
void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now) {
if(unlikely(tier >= storage_tiers)) return;
@ -1494,8 +1495,6 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now)
struct storage_engine_query_handle handle;
size_t all_points_read = 0;
// for each lower tier
for(int tr = (int)tier - 1; tr >= 0 ;tr--){
time_t smaller_tier_first_time = rd->tiers[tr]->query_ops->oldest_time(rd->tiers[tr]->db_metric_handle);
@ -1508,27 +1507,26 @@ void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now)
struct rrddim_tier *tmp = rd->tiers[tr];
tmp->query_ops->init(tmp->db_metric_handle, &handle, after_wanted, before_wanted);
size_t points = 0;
size_t points_read = 0;
while(!tmp->query_ops->is_finished(&handle)) {
STORAGE_POINT sp = tmp->query_ops->next_metric(&handle);
points_read++;
if(sp.end_time > latest_time_t) {
latest_time_t = sp.end_time;
store_metric_at_tier(rd, t, sp, sp.end_time * USEC_PER_SEC);
points++;
store_metric_at_tier(rd, tr, t, sp, sp.end_time * USEC_PER_SEC);
}
}
all_points_read += points;
tmp->query_ops->finalize(&handle);
store_metric_collection_completed();
global_statistics_backfill_query_completed(points_read);
//internal_error(true, "DBENGINE: backfilled chart '%s', dimension '%s', tier %d, from %ld to %ld, with %zu points from tier %d",
// rd->rrdset->name, rd->name, tier, after_wanted, before_wanted, points, tr);
}
rrdr_query_completed(all_points_read, all_points_read);
}
// ----------------------------------------------------------------------------
@ -1977,7 +1975,7 @@ RRDR *rrd2rrdr_legacy(
ONEWAYALLOC *owa,
RRDSET *st, size_t points, time_t after, time_t before,
RRDR_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions,
const char *group_options, time_t timeout, size_t tier) {
const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source) {
QUERY_TARGET_REQUEST qtr = {
.st = st,
@ -1991,6 +1989,7 @@ RRDR *rrd2rrdr_legacy(
.group_options = group_options,
.timeout = timeout,
.tier = tier,
.query_source = query_source,
};
return rrd2rrdr(owa, query_target_create(&qtr));
@ -2170,6 +2169,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
}
}
rrdr_query_completed(r->internal.db_points_read, r->internal.result_points_generated);
global_statistics_rrdr_query_completed(dimensions_used, r->internal.db_points_read,
r->internal.result_points_generated, qt->request.query_source);
return r;
}

View file

@ -138,7 +138,7 @@ RRDR *rrd2rrdr_legacy(
ONEWAYALLOC *owa,
RRDSET *st, size_t points, time_t after, time_t before,
RRDR_GROUPING group_method, time_t resampling_time, RRDR_OPTIONS options, const char *dimensions,
const char *group_options, time_t timeout, size_t tier);
const char *group_options, time_t timeout, size_t tier, QUERY_SOURCE query_source);
RRDR *rrd2rrdr(ONEWAYALLOC *owa, struct query_target *qt);
bool query_target_calculate_window(struct query_target *qt);

View file

@ -518,7 +518,8 @@ NETDATA_DOUBLE *rrd2rrdr_ks2(
.options = options,
.group_method = group_method,
.group_options = group_options,
.tier = tier
.tier = tier,
.query_source = QUERY_SOURCE_API_WEIGHTS,
};
RRDR *r = rrd2rrdr(owa, query_target_create(&qtr));
@ -637,7 +638,7 @@ static void rrdset_metric_correlations_volume(
options |= RRDR_OPTION_MATCH_IDS | RRDR_OPTION_ABSOLUTE | RRDR_OPTION_NATURAL_POINTS;
QUERY_VALUE baseline_average = rrdmetric2value(host, rca, ria, rma, baseline_after, baseline_before, options, group_method, group_options, tier, 0);
QUERY_VALUE baseline_average = rrdmetric2value(host, rca, ria, rma, baseline_after, baseline_before, options, group_method, group_options, tier, 0, QUERY_SOURCE_API_WEIGHTS);
merge_query_value_to_stats(&baseline_average, stats);
if(!netdata_double_isnumber(baseline_average.value)) {
@ -645,7 +646,7 @@ static void rrdset_metric_correlations_volume(
baseline_average.value = 0.0;
}
QUERY_VALUE highlight_average = rrdmetric2value(host, rca, ria, rma, after, before, options, group_method, group_options, tier, 0);
QUERY_VALUE highlight_average = rrdmetric2value(host, rca, ria, rma, after, before, options, group_method, group_options, tier, 0, QUERY_SOURCE_API_WEIGHTS);
merge_query_value_to_stats(&highlight_average, stats);
if(!netdata_double_isnumber(highlight_average.value))
@ -658,7 +659,7 @@ static void rrdset_metric_correlations_volume(
char highlight_countif_options[50 + 1];
snprintfz(highlight_countif_options, 50, "%s" NETDATA_DOUBLE_FORMAT, highlight_average.value < baseline_average.value ? "<" : ">", baseline_average.value);
QUERY_VALUE highlight_countif = rrdmetric2value(host, rca, ria, rma, after, before, options, RRDR_GROUPING_COUNTIF, highlight_countif_options, tier, 0);
QUERY_VALUE highlight_countif = rrdmetric2value(host, rca, ria, rma, after, before, options, RRDR_GROUPING_COUNTIF, highlight_countif_options, tier, 0, QUERY_SOURCE_API_WEIGHTS);
merge_query_value_to_stats(&highlight_countif, stats);
if(!netdata_double_isnumber(highlight_countif.value)) {
@ -699,7 +700,7 @@ static void rrdset_weights_anomaly_rate(
options |= RRDR_OPTION_MATCH_IDS | RRDR_OPTION_ANOMALY_BIT | RRDR_OPTION_NATURAL_POINTS;
QUERY_VALUE qv = rrdmetric2value(host, rca, ria, rma, after, before, options, group_method, group_options, tier, 0);
QUERY_VALUE qv = rrdmetric2value(host, rca, ria, rma, after, before, options, group_method, group_options, tier, 0, QUERY_SOURCE_API_WEIGHTS);
merge_query_value_to_stats(&qv, stats);
if(netdata_double_isnumber(qv.value))

View file

@ -751,6 +751,7 @@ inline int web_client_api_request_v1_data(RRDHOST *host, struct web_client *w, c
.tier = tier,
.chart_label_key = chart_label_key,
.charts_labels_filter = chart_labels_filter,
.query_source = QUERY_SOURCE_API_DATA,
};
qt = query_target_create(&qtr);

View file

@ -85,11 +85,11 @@ void web_client_request_done(struct web_client *w) {
// --------------------------------------------------------------------
// global statistics
finished_web_request_statistics(dt_usec(&tv, &w->tv_in),
w->stats_received_bytes,
w->stats_sent_bytes,
size,
sent);
global_statistics_web_request_completed(dt_usec(&tv, &w->tv_in),
w->stats_received_bytes,
w->stats_sent_bytes,
size,
sent);
w->stats_received_bytes = 0;
w->stats_sent_bytes = 0;

View file

@ -209,7 +209,7 @@ struct web_client *web_client_get_from_cache_or_allocate() {
web_clients_cache.used_count++;
// initialize it
w->id = web_client_connected();
w->id = global_statistics_web_client_connected();
w->mode = WEB_CLIENT_MODE_NORMAL;
netdata_thread_enable_cancelability();
@ -230,7 +230,7 @@ void web_client_release(struct web_client *w) {
web_server_log_connection(w, "DISCONNECTED");
web_client_request_done(w);
web_client_disconnected();
global_statistics_web_client_disconnected();
netdata_thread_disable_cancelability();