mirror of
https://github.com/netdata/netdata.git
synced 2025-04-16 18:37:50 +00:00
Variable Granularity support for data collection (#6430)
* Variable Granularity support for data collection in the dbengine. * Variable Granularity support for data collection in the daemon. * Added tests to validate the data being queried after having been collected by changing data collection interval * Fix memory corruption * Updated database engine documentation about data collection frequency behaviour
This commit is contained in:
parent
dab0eeeea4
commit
2a06960117
15 changed files with 1631 additions and 321 deletions
|
@ -131,7 +131,8 @@ calculated_number backend_calculate_value_from_stored_data(
|
|||
}
|
||||
*/
|
||||
for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) {
|
||||
n = rd->state->query_ops.next_metric(&handle);
|
||||
time_t curr_t;
|
||||
n = rd->state->query_ops.next_metric(&handle, &curr_t);
|
||||
|
||||
if(unlikely(!does_storage_number_exist(n))) {
|
||||
// not collected
|
||||
|
|
|
@ -1581,34 +1581,12 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number
|
|||
if(unlikely(v > rd->collected_value_max)) rd->collected_value_max = v;
|
||||
}
|
||||
|
||||
int test_dbengine(void)
|
||||
static RRDHOST *dbengine_rrdhost_find_or_create(char *name)
|
||||
{
|
||||
const int CHARTS = 128;
|
||||
const int DIMS = 16; /* That gives us 2048 metrics */
|
||||
const int POINTS = 16384; /* This produces 128MiB of metric data */
|
||||
const int QUERY_BATCH = 4096;
|
||||
uint8_t same;
|
||||
int i, j, k, c, errors;
|
||||
RRDHOST *host = NULL;
|
||||
RRDSET *st[CHARTS];
|
||||
RRDDIM *rd[CHARTS][DIMS];
|
||||
char name[101];
|
||||
time_t time_now;
|
||||
collected_number last;
|
||||
struct rrddim_query_handle handle;
|
||||
calculated_number value, expected;
|
||||
storage_number n;
|
||||
|
||||
error_log_limit_unlimited();
|
||||
fprintf(stderr, "\nRunning DB-engine test\n");
|
||||
|
||||
default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
|
||||
|
||||
debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
|
||||
host = rrdhost_find_or_create(
|
||||
"unittest-dbengine"
|
||||
, "unittest-dbengine"
|
||||
, "unittest-dbengine"
|
||||
return rrdhost_find_or_create(
|
||||
name
|
||||
, name
|
||||
, name
|
||||
, os_type
|
||||
, netdata_configured_timezone
|
||||
, config_get(CONFIG_SECTION_BACKEND, "host tags", "")
|
||||
|
@ -1624,15 +1602,33 @@ int test_dbengine(void)
|
|||
, default_rrdpush_send_charts_matching
|
||||
, NULL
|
||||
);
|
||||
if (NULL == host)
|
||||
return 1;
|
||||
}
|
||||
|
||||
// costants for test_dbengine
|
||||
static const int CHARTS = 64;
|
||||
static const int DIMS = 16; // That gives us 64 * 16 = 1024 metrics
|
||||
#define REGIONS (3) // 3 regions of update_every
|
||||
// first region update_every is 2, second is 3, third is 1
|
||||
static const int REGION_UPDATE_EVERY[REGIONS] = {2, 3, 1};
|
||||
static const int REGION_POINTS[REGIONS] = {
|
||||
16384, // This produces 64MiB of metric data for the first region: update_every = 2
|
||||
16384, // This produces 64MiB of metric data for the second region: update_every = 3
|
||||
16384, // This produces 64MiB of metric data for the third region: update_every = 1
|
||||
};
|
||||
static const int QUERY_BATCH = 4096;
|
||||
|
||||
static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
|
||||
int update_every)
|
||||
{
|
||||
int i, j;
|
||||
char name[101];
|
||||
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
snprintfz(name, 100, "dbengine-chart-%d", i);
|
||||
|
||||
// create the chart
|
||||
st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest",
|
||||
NULL, 1, 1, RRDSET_TYPE_LINE);
|
||||
NULL, 1, update_every, RRDSET_TYPE_LINE);
|
||||
rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG);
|
||||
rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST);
|
||||
for (j = 0 ; j < DIMS ; ++j) {
|
||||
|
@ -1642,50 +1638,103 @@ int test_dbengine(void)
|
|||
}
|
||||
}
|
||||
|
||||
// Initialize DB with the very first entries
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
for (j = 0 ; j < DIMS ; ++j) {
|
||||
rd[i][j]->last_collected_time.tv_sec =
|
||||
st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = 2 * API_RELATIVE_TIME_MAX - 1;
|
||||
rd[i][j]->last_collected_time.tv_usec =
|
||||
st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
|
||||
}
|
||||
}
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
st[i]->usec_since_last_update = USEC_PER_SEC;
|
||||
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
rrddim_set_by_pointer_fake_time(rd[i][j], 69, 2 * API_RELATIVE_TIME_MAX); // set first value to 69
|
||||
}
|
||||
rrdset_done(st[i]);
|
||||
}
|
||||
// Fluh pages for subsequent real values
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
rrdeng_store_metric_flush_current_page(rd[i][j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Feeds the database region with test data, returns last timestamp of region
|
||||
static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
|
||||
int current_region, time_t time_start)
|
||||
{
|
||||
time_t time_now;
|
||||
int i, j, c, update_every;
|
||||
collected_number next;
|
||||
|
||||
update_every = REGION_UPDATE_EVERY[current_region];
|
||||
time_now = time_start + update_every;
|
||||
// feed it with the test data
|
||||
time_now = 1;
|
||||
last = 0;
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
for (j = 0 ; j < DIMS ; ++j) {
|
||||
rd[i][j]->last_collected_time.tv_sec =
|
||||
st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now;
|
||||
rd[i][j]->last_collected_time.tv_usec =
|
||||
st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
|
||||
st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
|
||||
}
|
||||
}
|
||||
for(c = 0; c < POINTS ; ++c) {
|
||||
++time_now; // time_now = c + 2
|
||||
for (c = 0; c < REGION_POINTS[current_region] ; ++c) {
|
||||
time_now += update_every; // time_now = start + (c + 2) * update_every
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
st[i]->usec_since_last_update = USEC_PER_SEC;
|
||||
st[i]->usec_since_last_update = USEC_PER_SEC * update_every;
|
||||
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
last = i * DIMS * POINTS + j * POINTS + c;
|
||||
rrddim_set_by_pointer_fake_time(rd[i][j], last, time_now);
|
||||
next = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
|
||||
rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now);
|
||||
}
|
||||
rrdset_done(st[i]);
|
||||
}
|
||||
}
|
||||
return time_now; //time_end
|
||||
}
|
||||
|
||||
// check the result
|
||||
// Checks the metric data for the given region, returns number of errors
|
||||
static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
|
||||
int current_region, time_t time_start)
|
||||
{
|
||||
uint8_t same;
|
||||
time_t time_now, time_retrieved;
|
||||
int i, j, k, c, errors, update_every;
|
||||
collected_number last;
|
||||
calculated_number value, expected;
|
||||
storage_number n;
|
||||
struct rrddim_query_handle handle;
|
||||
|
||||
update_every = REGION_UPDATE_EVERY[current_region];
|
||||
errors = 0;
|
||||
|
||||
for(c = 0; c < POINTS ; c += QUERY_BATCH) {
|
||||
time_now = c + 2;
|
||||
// check the result
|
||||
for (c = 0; c < REGION_POINTS[current_region] ; c += QUERY_BATCH) {
|
||||
time_now = time_start + (c + 2) * update_every;
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH);
|
||||
rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH * update_every);
|
||||
for (k = 0; k < QUERY_BATCH; ++k) {
|
||||
last = i * DIMS * POINTS + j * POINTS + c + k;
|
||||
last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k;
|
||||
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
|
||||
|
||||
n = rd[i][j]->state->query_ops.next_metric(&handle);
|
||||
n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved);
|
||||
value = unpack_storage_number(n);
|
||||
|
||||
same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
|
||||
if(!same) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
|
||||
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now + k, expected, value);
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, expected, value);
|
||||
errors++;
|
||||
}
|
||||
if(time_retrieved != time_now + k * update_every) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved);
|
||||
errors++;
|
||||
}
|
||||
}
|
||||
|
@ -1693,7 +1742,184 @@ int test_dbengine(void)
|
|||
}
|
||||
}
|
||||
}
|
||||
return errors;
|
||||
}
|
||||
|
||||
// Check rrdr transformations
|
||||
static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
|
||||
int current_region, time_t time_start, time_t time_end)
|
||||
{
|
||||
uint8_t same;
|
||||
time_t time_now, time_retrieved;
|
||||
int i, j, errors, update_every;
|
||||
long c;
|
||||
collected_number last;
|
||||
calculated_number value, expected;
|
||||
|
||||
errors = 0;
|
||||
update_every = REGION_UPDATE_EVERY[current_region];
|
||||
long points = (time_end - time_start) / update_every - 1;
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
RRDR *r = rrd2rrdr(st[i], points, time_start + update_every, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL);
|
||||
if (!r) {
|
||||
fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
|
||||
return ++errors;
|
||||
} else {
|
||||
assert(r->st == st[i]);
|
||||
for (c = 0; c != rrdr_rows(r) ; ++c) {
|
||||
RRDDIM *d;
|
||||
time_now = time_start + (c + 2) * update_every;
|
||||
time_retrieved = r->t[c];
|
||||
|
||||
// for each dimension
|
||||
for (j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) {
|
||||
calculated_number *cn = &r->v[ c * r->d ];
|
||||
value = cn[j];
|
||||
assert(rd[i][j] == d);
|
||||
|
||||
last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
|
||||
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
|
||||
|
||||
same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
|
||||
if(!same) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
|
||||
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
|
||||
errors++;
|
||||
}
|
||||
if(time_retrieved != time_now) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
|
||||
errors++;
|
||||
}
|
||||
}
|
||||
}
|
||||
rrdr_free(r);
|
||||
}
|
||||
}
|
||||
return errors;
|
||||
}
|
||||
|
||||
int test_dbengine(void)
|
||||
{
|
||||
int i, j, errors, update_every, current_region;
|
||||
RRDHOST *host = NULL;
|
||||
RRDSET *st[CHARTS];
|
||||
RRDDIM *rd[CHARTS][DIMS];
|
||||
time_t time_start[REGIONS], time_end[REGIONS];
|
||||
|
||||
error_log_limit_unlimited();
|
||||
fprintf(stderr, "\nRunning DB-engine test\n");
|
||||
|
||||
default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
|
||||
|
||||
debug(D_RRDHOST, "Initializing localhost with hostname 'unittest-dbengine'");
|
||||
host = dbengine_rrdhost_find_or_create("unittest-dbengine");
|
||||
if (NULL == host)
|
||||
return 1;
|
||||
|
||||
current_region = 0; // this is the first region of data
|
||||
update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 2 seconds
|
||||
test_dbengine_create_charts(host, st, rd, update_every);
|
||||
|
||||
time_start[current_region] = 2 * API_RELATIVE_TIME_MAX;
|
||||
time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
|
||||
|
||||
errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
|
||||
if (errors)
|
||||
goto error_out;
|
||||
|
||||
current_region = 1; //this is the second region of data
|
||||
update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 3 seconds
|
||||
// Align pages for frequency change
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
st[i]->update_every = update_every;
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
rrdeng_store_metric_flush_current_page(rd[i][j]);
|
||||
}
|
||||
}
|
||||
|
||||
time_start[current_region] = time_end[current_region - 1] + update_every;
|
||||
if (0 != time_start[current_region] % update_every) // align to update_every
|
||||
time_start[current_region] += update_every - time_start[current_region] % update_every;
|
||||
time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
|
||||
|
||||
errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
|
||||
if (errors)
|
||||
goto error_out;
|
||||
|
||||
current_region = 2; //this is the third region of data
|
||||
update_every = REGION_UPDATE_EVERY[current_region]; // set data collection frequency to 1 seconds
|
||||
// Align pages for frequency change
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
st[i]->update_every = update_every;
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
rrdeng_store_metric_flush_current_page(rd[i][j]);
|
||||
}
|
||||
}
|
||||
|
||||
time_start[current_region] = time_end[current_region - 1] + update_every;
|
||||
if (0 != time_start[current_region] % update_every) // align to update_every
|
||||
time_start[current_region] += update_every - time_start[current_region] % update_every;
|
||||
time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
|
||||
|
||||
errors = test_dbengine_check_metrics(st, rd, current_region, time_start[current_region]);
|
||||
if (errors)
|
||||
goto error_out;
|
||||
|
||||
for (current_region = 0 ; current_region < REGIONS ; ++current_region) {
|
||||
errors = test_dbengine_check_rrdr(st, rd, current_region, time_start[current_region], time_end[current_region]);
|
||||
if (errors)
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
current_region = 1;
|
||||
update_every = REGION_UPDATE_EVERY[current_region]; // use the maximum update_every = 3
|
||||
errors = 0;
|
||||
long points = (time_end[REGIONS - 1] - time_start[0]) / update_every - 1; // cover all time regions with RRDR
|
||||
long point_offset = (time_start[current_region] - time_start[0]) / update_every;
|
||||
for (i = 0 ; i < CHARTS ; ++i) {
|
||||
RRDR *r = rrd2rrdr(st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL);
|
||||
if (!r) {
|
||||
fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
|
||||
++errors;
|
||||
} else {
|
||||
long c;
|
||||
|
||||
assert(r->st == st[i]);
|
||||
// test current region values only, since they must be left unchanged
|
||||
for (c = point_offset ; c < point_offset + rrdr_rows(r) / REGIONS / 2 ; ++c) {
|
||||
RRDDIM *d;
|
||||
time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every;
|
||||
time_t time_retrieved = r->t[c];
|
||||
|
||||
// for each dimension
|
||||
for(j = 0, d = r->st->dimensions ; d && j < r->d ; ++j, d = d->next) {
|
||||
calculated_number *cn = &r->v[ c * r->d ];
|
||||
calculated_number value = cn[j];
|
||||
assert(rd[i][j] == d);
|
||||
|
||||
collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset;
|
||||
calculated_number expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
|
||||
|
||||
uint8_t same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
|
||||
if(!same) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
|
||||
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
|
||||
errors++;
|
||||
}
|
||||
if(time_retrieved != time_now) {
|
||||
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
|
||||
st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
|
||||
errors++;
|
||||
}
|
||||
}
|
||||
}
|
||||
rrdr_free(r);
|
||||
}
|
||||
}
|
||||
error_out:
|
||||
rrdeng_exit(host->rrdeng_ctx);
|
||||
rrd_wrlock();
|
||||
rrdhost_delete_charts(host);
|
||||
|
@ -1704,43 +1930,25 @@ int test_dbengine(void)
|
|||
|
||||
void generate_dbengine_dataset(unsigned history_seconds)
|
||||
{
|
||||
const int DIMS = 128;
|
||||
const int DSET_DIMS = 128;
|
||||
const uint64_t EXPECTED_COMPRESSION_RATIO = 94;
|
||||
int j;
|
||||
int j, update_every = 1;
|
||||
RRDHOST *host = NULL;
|
||||
RRDSET *st;
|
||||
RRDDIM *rd[DIMS];
|
||||
RRDDIM *rd[DSET_DIMS];
|
||||
char name[101];
|
||||
time_t time_current, time_present;
|
||||
|
||||
default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
|
||||
default_rrdeng_page_cache_mb = 128;
|
||||
/* Worst case for uncompressible data */
|
||||
default_rrdeng_disk_quota_mb = (((uint64_t)DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024);
|
||||
// Worst case for uncompressible data
|
||||
default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024);
|
||||
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
|
||||
|
||||
error_log_limit_unlimited();
|
||||
debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-dataset'");
|
||||
|
||||
host = rrdhost_find_or_create(
|
||||
"dbengine-dataset"
|
||||
, "dbengine-dataset"
|
||||
, "dbengine-dataset"
|
||||
, os_type
|
||||
, netdata_configured_timezone
|
||||
, config_get(CONFIG_SECTION_BACKEND, "host tags", "")
|
||||
, program_name
|
||||
, program_version
|
||||
, default_rrd_update_every
|
||||
, default_rrd_history_entries
|
||||
, RRD_MEMORY_MODE_DBENGINE
|
||||
, default_health_enabled
|
||||
, default_rrdpush_enabled
|
||||
, default_rrdpush_destination
|
||||
, default_rrdpush_api_key
|
||||
, default_rrdpush_send_charts_matching
|
||||
, NULL
|
||||
);
|
||||
host = dbengine_rrdhost_find_or_create("dbengine-dataset");
|
||||
if (NULL == host)
|
||||
return;
|
||||
|
||||
|
@ -1748,8 +1956,8 @@ void generate_dbengine_dataset(unsigned history_seconds)
|
|||
|
||||
// create the chart
|
||||
st = rrdset_create(host, "example", "random", "random", "example", NULL, "random", "random", "random",
|
||||
NULL, 1, 1, RRDSET_TYPE_LINE);
|
||||
for (j = 0 ; j < DIMS ; ++j) {
|
||||
NULL, 1, update_every, RRDSET_TYPE_LINE);
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
snprintfz(name, 100, "random%d", j);
|
||||
|
||||
rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
|
@ -1758,7 +1966,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
|
|||
time_present = now_realtime_sec();
|
||||
// feed it with the test data
|
||||
time_current = time_present - history_seconds;
|
||||
for (j = 0 ; j < DIMS ; ++j) {
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
rd[j]->last_collected_time.tv_sec =
|
||||
st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current;
|
||||
rd[j]->last_collected_time.tv_usec =
|
||||
|
@ -1767,7 +1975,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
|
|||
for( ; time_current < time_present; ++time_current) {
|
||||
st->usec_since_last_update = USEC_PER_SEC;
|
||||
|
||||
for (j = 0; j < DIMS; ++j) {
|
||||
for (j = 0; j < DSET_DIMS; ++j) {
|
||||
rrddim_set_by_pointer_fake_time(rd[j], (time_current + j) % 128, time_current);
|
||||
}
|
||||
rrdset_done(st);
|
||||
|
|
|
@ -47,6 +47,8 @@ Currently Netdata supports 6 memory modes:
|
|||
database. There is some amount of RAM dedicated to data caching and indexing and the rest of
|
||||
the data reside compressed on disk. The number of history entries is not fixed in this case,
|
||||
but depends on the configured disk space and the effective compression ratio of the data stored.
|
||||
This is the **only mode** that supports changing the data collection update frequency
|
||||
(`update_every`) **without losing** the previously stored metrics.
|
||||
For more details see [here](engine/).
|
||||
|
||||
You can select the memory mode by editing `netdata.conf` and setting:
|
||||
|
|
|
@ -4,6 +4,8 @@ The Database Engine works like a traditional
|
|||
database. There is some amount of RAM dedicated to data caching and indexing and the rest of
|
||||
the data reside compressed on disk. The number of history entries is not fixed in this case,
|
||||
but depends on the configured disk space and the effective compression ratio of the data stored.
|
||||
This is the **only mode** that supports changing the data collection update frequency
|
||||
(`update_every`) **without losing** the previously stored metrics.
|
||||
|
||||
## Files
|
||||
|
||||
|
|
|
@ -419,6 +419,35 @@ static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec
|
|||
return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
|
||||
}
|
||||
|
||||
/* The caller must hold the page index lock */
|
||||
static inline struct rrdeng_page_descr *
|
||||
find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
|
||||
{
|
||||
struct rrdeng_page_descr *descr = NULL;
|
||||
Pvoid_t *PValue;
|
||||
Word_t Index;
|
||||
|
||||
Index = (Word_t)(start_time / USEC_PER_SEC);
|
||||
PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
|
||||
if (likely(NULL != PValue)) {
|
||||
descr = *PValue;
|
||||
if (is_page_in_time_range(descr, start_time, end_time)) {
|
||||
return descr;
|
||||
}
|
||||
}
|
||||
|
||||
Index = (Word_t)(start_time / USEC_PER_SEC);
|
||||
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
|
||||
if (likely(NULL != PValue)) {
|
||||
descr = *PValue;
|
||||
if (is_page_in_time_range(descr, start_time, end_time)) {
|
||||
return descr;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Update metric oldest and latest timestamps efficiently when adding new values */
|
||||
void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
|
||||
{
|
||||
|
@ -510,23 +539,12 @@ void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index
|
|||
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Searches for a page and triggers disk I/O if necessary and possible.
|
||||
* Does not get a reference.
|
||||
* Returns page index pointer for given metric UUID.
|
||||
*/
|
||||
struct pg_cache_page_index *
|
||||
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
|
||||
usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
|
||||
{
|
||||
struct page_cache *pg_cache = &ctx->pg_cache;
|
||||
struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
|
||||
struct page_cache_descr *pg_cache_descr = NULL;
|
||||
int i, j, k, count, found;
|
||||
unsigned long flags;
|
||||
struct rrdeng_page_descr *descr = NULL;
|
||||
Pvoid_t *PValue;
|
||||
struct pg_cache_page_index *page_index;
|
||||
Word_t Index;
|
||||
uint8_t failed_to_reserve;
|
||||
|
||||
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
|
||||
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
|
||||
|
@ -535,45 +553,130 @@ struct pg_cache_page_index *
|
|||
}
|
||||
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
|
||||
if (NULL == PValue) {
|
||||
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
|
||||
return NULL;
|
||||
return INVALID_TIME;
|
||||
}
|
||||
|
||||
uv_rwlock_rdlock(&page_index->lock);
|
||||
/* Find first page in range */
|
||||
found = 0;
|
||||
Index = (Word_t)(start_time / USEC_PER_SEC);
|
||||
PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
|
||||
if (likely(NULL != PValue)) {
|
||||
descr = *PValue;
|
||||
if (is_page_in_time_range(descr, start_time, end_time)) {
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
Index = (Word_t)(start_time / USEC_PER_SEC);
|
||||
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
|
||||
if (likely(NULL != PValue)) {
|
||||
descr = *PValue;
|
||||
if (is_page_in_time_range(descr, start_time, end_time)) {
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
descr = find_first_page_in_time_range(page_index, start_time, end_time);
|
||||
if (NULL == descr) {
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
return INVALID_TIME;
|
||||
}
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
return descr->start_time;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return page information for the first page before point_in_time that satisfies the filter.
|
||||
* @param ctx DB context
|
||||
* @param page_index page index of a metric
|
||||
* @param point_in_time the pages that are searched must be older than this timestamp
|
||||
* @param filter decides if the page satisfies the caller's criteria
|
||||
* @param page_info the result of the search is set in this pointer
|
||||
*/
|
||||
void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
|
||||
usec_t point_in_time, pg_cache_page_info_filter_t *filter,
|
||||
struct rrdeng_page_info *page_info)
|
||||
{
|
||||
struct page_cache *pg_cache = &ctx->pg_cache;
|
||||
struct rrdeng_page_descr *descr = NULL;
|
||||
Pvoid_t *PValue;
|
||||
Word_t Index;
|
||||
|
||||
(void)pg_cache;
|
||||
assert(NULL != page_index);
|
||||
|
||||
Index = (Word_t)(point_in_time / USEC_PER_SEC);
|
||||
uv_rwlock_rdlock(&page_index->lock);
|
||||
do {
|
||||
PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
|
||||
descr = unlikely(NULL == PValue) ? NULL : *PValue;
|
||||
} while (descr != NULL && !filter(descr));
|
||||
if (unlikely(NULL == descr)) {
|
||||
page_info->page_length = 0;
|
||||
page_info->start_time = INVALID_TIME;
|
||||
page_info->end_time = INVALID_TIME;
|
||||
} else {
|
||||
page_info->page_length = descr->page_length;
|
||||
page_info->start_time = descr->start_time;
|
||||
page_info->end_time = descr->end_time;
|
||||
}
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
}
|
||||
/**
|
||||
* Searches for pages in a time range and triggers disk I/O if necessary and possible.
|
||||
* Does not get a reference.
|
||||
* @param ctx DB context
|
||||
* @param id UUID
|
||||
* @param start_time inclusive starting time in usec
|
||||
* @param end_time inclusive ending time in usec
|
||||
* @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
|
||||
* with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
|
||||
* If page_info_arrayp is set to NULL nothing was allocated.
|
||||
* @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
|
||||
* @return the number of pages that overlap with the time range [start_time,end_time].
|
||||
*/
|
||||
unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
|
||||
struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
|
||||
{
|
||||
struct page_cache *pg_cache = &ctx->pg_cache;
|
||||
struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
|
||||
struct page_cache_descr *pg_cache_descr = NULL;
|
||||
unsigned i, j, k, preload_count, count, page_info_array_max_size;
|
||||
unsigned long flags;
|
||||
Pvoid_t *PValue;
|
||||
struct pg_cache_page_index *page_index;
|
||||
Word_t Index;
|
||||
uint8_t failed_to_reserve;
|
||||
|
||||
assert(NULL != ret_page_indexp);
|
||||
|
||||
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
|
||||
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
|
||||
if (likely(NULL != PValue)) {
|
||||
*ret_page_indexp = page_index = *PValue;
|
||||
}
|
||||
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
|
||||
if (NULL == PValue) {
|
||||
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
|
||||
return page_index;
|
||||
*ret_page_indexp = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (count = 0 ;
|
||||
descr != NULL && is_page_in_time_range(descr, start_time, end_time);
|
||||
uv_rwlock_rdlock(&page_index->lock);
|
||||
descr = find_first_page_in_time_range(page_index, start_time, end_time);
|
||||
if (NULL == descr) {
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
|
||||
*ret_page_indexp = NULL;
|
||||
return 0;
|
||||
} else {
|
||||
Index = (Word_t)(descr->start_time / USEC_PER_SEC);
|
||||
}
|
||||
if (page_info_arrayp) {
|
||||
page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
|
||||
*page_info_arrayp = mallocz(page_info_array_max_size);
|
||||
}
|
||||
|
||||
for (count = 0, preload_count = 0 ;
|
||||
descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
|
||||
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
|
||||
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
|
||||
/* Iterate all pages in range */
|
||||
|
||||
if (unlikely(0 == descr->page_length))
|
||||
continue;
|
||||
if (page_info_arrayp) {
|
||||
if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
|
||||
page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
|
||||
*page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
|
||||
}
|
||||
(*page_info_arrayp)[count].start_time = descr->start_time;
|
||||
(*page_info_arrayp)[count].end_time = descr->end_time;
|
||||
(*page_info_arrayp)[count].page_length = descr->page_length;
|
||||
}
|
||||
++count;
|
||||
|
||||
rrdeng_page_descr_mutex_lock(ctx, descr);
|
||||
pg_cache_descr = descr->pg_cache_descr;
|
||||
flags = pg_cache_descr->flags;
|
||||
|
@ -586,8 +689,8 @@ struct pg_cache_page_index *
|
|||
}
|
||||
}
|
||||
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
|
||||
preload_array[count++] = descr;
|
||||
if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
|
||||
preload_array[preload_count++] = descr;
|
||||
if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
|
||||
rrdeng_page_descr_mutex_unlock(ctx, descr);
|
||||
break;
|
||||
}
|
||||
|
@ -598,7 +701,7 @@ struct pg_cache_page_index *
|
|||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
|
||||
failed_to_reserve = 0;
|
||||
for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
|
||||
for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
|
||||
struct rrdeng_cmd cmd;
|
||||
struct rrdeng_page_descr *next;
|
||||
|
||||
|
@ -614,7 +717,7 @@ struct pg_cache_page_index *
|
|||
cmd.read_extent.page_cache_descr[0] = descr;
|
||||
/* don't use this page again */
|
||||
preload_array[i] = NULL;
|
||||
for (j = 0, k = 1 ; j < count ; ++j) {
|
||||
for (j = 0, k = 1 ; j < preload_count ; ++j) {
|
||||
next = preload_array[j];
|
||||
if (NULL == next) {
|
||||
continue;
|
||||
|
@ -635,7 +738,7 @@ struct pg_cache_page_index *
|
|||
}
|
||||
if (failed_to_reserve) {
|
||||
debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
|
||||
for (i = 0 ; i < count ; ++i) {
|
||||
for (i = 0 ; i < preload_count ; ++i) {
|
||||
descr = preload_array[i];
|
||||
if (NULL == descr) {
|
||||
continue;
|
||||
|
@ -643,11 +746,15 @@ struct pg_cache_page_index *
|
|||
pg_cache_put(ctx, descr);
|
||||
}
|
||||
}
|
||||
if (!count) {
|
||||
if (!preload_count) {
|
||||
/* no such page */
|
||||
debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
|
||||
}
|
||||
return page_index;
|
||||
if (unlikely(0 == count && page_info_arrayp)) {
|
||||
freez(*page_info_arrayp);
|
||||
*page_info_arrayp = NULL;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -757,6 +864,105 @@ struct rrdeng_page_descr *
|
|||
return descr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Searches for the first page between start_time and end_time and gets a reference.
|
||||
* start_time and end_time are inclusive.
|
||||
* If index is NULL lookup by UUID (id).
|
||||
*/
|
||||
struct rrdeng_page_descr *
|
||||
pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
|
||||
usec_t start_time, usec_t end_time)
|
||||
{
|
||||
struct page_cache *pg_cache = &ctx->pg_cache;
|
||||
struct rrdeng_page_descr *descr = NULL;
|
||||
struct page_cache_descr *pg_cache_descr = NULL;
|
||||
unsigned long flags;
|
||||
Pvoid_t *PValue;
|
||||
struct pg_cache_page_index *page_index;
|
||||
uint8_t page_not_in_cache;
|
||||
|
||||
if (unlikely(NULL == index)) {
|
||||
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
|
||||
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
|
||||
if (likely(NULL != PValue)) {
|
||||
page_index = *PValue;
|
||||
}
|
||||
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
|
||||
if (NULL == PValue) {
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
page_index = index;
|
||||
}
|
||||
pg_cache_reserve_pages(ctx, 1);
|
||||
|
||||
page_not_in_cache = 0;
|
||||
uv_rwlock_rdlock(&page_index->lock);
|
||||
while (1) {
|
||||
descr = find_first_page_in_time_range(page_index, start_time, end_time);
|
||||
if (NULL == descr || 0 == descr->page_length) {
|
||||
/* non-empty page not found */
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
|
||||
pg_cache_release_pages(ctx, 1);
|
||||
return NULL;
|
||||
}
|
||||
rrdeng_page_descr_mutex_lock(ctx, descr);
|
||||
pg_cache_descr = descr->pg_cache_descr;
|
||||
flags = pg_cache_descr->flags;
|
||||
if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
|
||||
/* success */
|
||||
rrdeng_page_descr_mutex_unlock(ctx, descr);
|
||||
debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
|
||||
break;
|
||||
}
|
||||
if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
|
||||
struct rrdeng_cmd cmd;
|
||||
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
|
||||
cmd.opcode = RRDENG_READ_PAGE;
|
||||
cmd.read_page.page_cache_descr = descr;
|
||||
rrdeng_enq_cmd(&ctx->worker_config, &cmd);
|
||||
|
||||
debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
|
||||
if(unlikely(debug_flags & D_RRDENGINE))
|
||||
print_page_cache_descr(descr);
|
||||
while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
|
||||
pg_cache_wait_event_unsafe(descr);
|
||||
}
|
||||
/* success */
|
||||
/* Downgrade exclusive reference to allow other readers */
|
||||
pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
|
||||
pg_cache_wake_up_waiters_unsafe(descr);
|
||||
rrdeng_page_descr_mutex_unlock(ctx, descr);
|
||||
rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
|
||||
return descr;
|
||||
}
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
|
||||
if(unlikely(debug_flags & D_RRDENGINE))
|
||||
print_page_cache_descr(descr);
|
||||
if (!(flags & RRD_PAGE_POPULATED))
|
||||
page_not_in_cache = 1;
|
||||
pg_cache_wait_event_unsafe(descr);
|
||||
rrdeng_page_descr_mutex_unlock(ctx, descr);
|
||||
|
||||
/* reset scan to find again */
|
||||
uv_rwlock_rdlock(&page_index->lock);
|
||||
}
|
||||
uv_rwlock_rdunlock(&page_index->lock);
|
||||
|
||||
if (!(flags & RRD_PAGE_DIRTY))
|
||||
pg_cache_replaceQ_set_hot(ctx, descr);
|
||||
pg_cache_release_pages(ctx, 1);
|
||||
if (page_not_in_cache)
|
||||
rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
|
||||
else
|
||||
rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
|
||||
return descr;
|
||||
}
|
||||
|
||||
struct pg_cache_page_index *create_page_index(uuid_t *id)
|
||||
{
|
||||
struct pg_cache_page_index *page_index;
|
||||
|
|
|
@ -48,9 +48,6 @@ struct page_cache_descr {
|
|||
* number of descriptor users | DESTROY | LOCKED | ALLOCATED |
|
||||
*/
|
||||
struct rrdeng_page_descr {
|
||||
uint32_t page_length;
|
||||
usec_t start_time;
|
||||
usec_t end_time;
|
||||
uuid_t *id; /* never changes */
|
||||
struct extent_info *extent;
|
||||
|
||||
|
@ -59,8 +56,25 @@ struct rrdeng_page_descr {
|
|||
|
||||
/* Compare-And-Swap target for page cache descriptor allocation algorithm */
|
||||
volatile unsigned long pg_cache_descr_state;
|
||||
|
||||
/* page information */
|
||||
usec_t start_time;
|
||||
usec_t end_time;
|
||||
uint32_t page_length;
|
||||
};
|
||||
|
||||
#define PAGE_INFO_SCRATCH_SZ (8)
|
||||
struct rrdeng_page_info {
|
||||
uint8_t scratch[PAGE_INFO_SCRATCH_SZ]; /* scratch area to be used by page-cache users */
|
||||
|
||||
usec_t start_time;
|
||||
usec_t end_time;
|
||||
uint32_t page_length;
|
||||
};
|
||||
|
||||
/* returns 1 for success, 0 for failure */
|
||||
typedef int pg_cache_page_info_filter_t(struct rrdeng_page_descr *);
|
||||
|
||||
#define PAGE_CACHE_MAX_PRELOAD_PAGES (256)
|
||||
|
||||
/* maps time ranges to pages */
|
||||
|
@ -149,11 +163,20 @@ extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_desc
|
|||
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
|
||||
struct rrdeng_page_descr *descr);
|
||||
extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
|
||||
extern struct pg_cache_page_index *
|
||||
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time);
|
||||
extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
|
||||
usec_t start_time, usec_t end_time);
|
||||
extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
|
||||
usec_t point_in_time, pg_cache_page_info_filter_t *filter,
|
||||
struct rrdeng_page_info *page_info);
|
||||
extern unsigned
|
||||
pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
|
||||
struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp);
|
||||
extern struct rrdeng_page_descr *
|
||||
pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
|
||||
usec_t point_in_time);
|
||||
extern struct rrdeng_page_descr *
|
||||
pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
|
||||
usec_t start_time, usec_t end_time);
|
||||
extern struct pg_cache_page_index *create_page_index(uuid_t *id);
|
||||
extern void init_page_cache(struct rrdengine_instance *ctx);
|
||||
extern void free_page_cache(struct rrdengine_instance *ctx);
|
||||
|
|
|
@ -24,6 +24,9 @@ void sanity_check(void)
|
|||
|
||||
/* page count must fit in 8 bits */
|
||||
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
|
||||
|
||||
/* page info scratch space must be able to hold 2 32-bit integers */
|
||||
BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
|
||||
}
|
||||
|
||||
void read_extent_cb(uv_fs_t* req)
|
||||
|
|
|
@ -218,6 +218,208 @@ void rrdeng_store_metric_finalize(RRDDIM *rd)
|
|||
}
|
||||
}
|
||||
|
||||
/* Returns 1 if the data collection interval is well defined, 0 otherwise */
|
||||
static int metrics_with_known_interval(struct rrdeng_page_descr *descr)
|
||||
{
|
||||
unsigned page_entries;
|
||||
|
||||
if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == descr->end_time))
|
||||
return 0;
|
||||
page_entries = descr->page_length / sizeof(storage_number);
|
||||
if (likely(page_entries > 1)) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
|
||||
{
|
||||
return (uint32_t *)&page_info->scratch[0];
|
||||
}
|
||||
|
||||
static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
|
||||
{
|
||||
return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the regions of different data collection intervals in a netdata chart in the time range
|
||||
* [start_time,end_time]. This call takes the netdata chart read lock.
|
||||
* @param st the netdata chart whose data collection interval boundaries are calculated.
|
||||
* @param start_time inclusive starting time in usec
|
||||
* @param end_time inclusive ending time in usec
|
||||
* @param region_info_arrayp It allocates (*region_info_arrayp) and populates it with information of regions of a
|
||||
* reference dimension that that have different data collection intervals and overlap with the time range
|
||||
* [start_time,end_time]. The caller must free (*region_info_arrayp) with freez(). If region_info_arrayp is set
|
||||
* to NULL nothing was allocated.
|
||||
* @param max_intervalp is derefenced and set to be the largest data collection interval of all regions.
|
||||
* @return number of regions with different data collection intervals.
|
||||
*/
|
||||
unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
|
||||
struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp)
|
||||
{
|
||||
struct pg_cache_page_index *page_index;
|
||||
struct rrdengine_instance *ctx;
|
||||
unsigned pages_nr;
|
||||
RRDDIM *rd_iter, *rd;
|
||||
struct rrdeng_page_info *page_info_array, *curr, *prev, *old_prev;
|
||||
unsigned i, j, page_entries, region_points, page_points, regions, max_interval;
|
||||
time_t now;
|
||||
usec_t dt, current_position_time, max_time = 0, min_time, curr_time, first_valid_time_in_page;
|
||||
struct rrdeng_region_info *region_info_array;
|
||||
uint8_t is_first_region_initialized;
|
||||
|
||||
ctx = st->rrdhost->rrdeng_ctx;
|
||||
regions = 1;
|
||||
*max_intervalp = max_interval = 0;
|
||||
region_info_array = NULL;
|
||||
*region_info_arrayp = NULL;
|
||||
page_info_array = NULL;
|
||||
|
||||
rrdset_rdlock(st);
|
||||
for(rd_iter = st->dimensions, rd = NULL, min_time = (usec_t)-1 ; rd_iter ; rd_iter = rd_iter->next) {
|
||||
/*
|
||||
* Choose oldest dimension as reference. This is not equivalent to the union of all dimensions
|
||||
* but it is a best effort approximation with a bias towards older metrics in a chart. It
|
||||
* matches netdata behaviour in the sense that dimensions are generally aligned in a chart
|
||||
* and older dimensions contain more information about the time range. It does not work well
|
||||
* for metrics that have recently stopped being collected.
|
||||
*/
|
||||
curr_time = pg_cache_oldest_time_in_range(ctx, rd_iter->state->rrdeng_uuid,
|
||||
start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
|
||||
if (INVALID_TIME != curr_time && curr_time < min_time) {
|
||||
rd = rd_iter;
|
||||
min_time = curr_time;
|
||||
}
|
||||
}
|
||||
rrdset_unlock(st);
|
||||
if (NULL == rd) {
|
||||
return 1;
|
||||
}
|
||||
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
|
||||
&page_info_array, &page_index);
|
||||
if (pages_nr) {
|
||||
/* conservative allocation, will reduce the size later if necessary */
|
||||
region_info_array = mallocz(sizeof(*region_info_array) * pages_nr);
|
||||
}
|
||||
is_first_region_initialized = 0;
|
||||
region_points = 0;
|
||||
|
||||
/* pages loop */
|
||||
for (i = 0, curr = NULL, prev = NULL ; i < pages_nr ; ++i) {
|
||||
old_prev = prev;
|
||||
prev = curr;
|
||||
curr = &page_info_array[i];
|
||||
*pginfo_to_points(curr) = 0; /* initialize to invalid page */
|
||||
*pginfo_to_dt(curr) = 0; /* no known data collection interval yet */
|
||||
if (unlikely(INVALID_TIME == curr->start_time || INVALID_TIME == curr->end_time)) {
|
||||
info("Ignoring page with invalid timestamp.");
|
||||
prev = old_prev;
|
||||
continue;
|
||||
}
|
||||
page_entries = curr->page_length / sizeof(storage_number);
|
||||
assert(0 != page_entries);
|
||||
if (likely(1 != page_entries)) {
|
||||
dt = (curr->end_time - curr->start_time) / (page_entries - 1);
|
||||
*pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(dt);
|
||||
if (unlikely(0 == *pginfo_to_dt(curr)))
|
||||
*pginfo_to_dt(curr) = 1;
|
||||
} else {
|
||||
dt = 0;
|
||||
}
|
||||
for (j = 0, page_points = 0 ; j < page_entries ; ++j) {
|
||||
uint8_t is_metric_out_of_order, is_metric_earlier_than_range;
|
||||
|
||||
is_metric_earlier_than_range = 0;
|
||||
is_metric_out_of_order = 0;
|
||||
|
||||
current_position_time = curr->start_time + j * dt;
|
||||
now = current_position_time / USEC_PER_SEC;
|
||||
if (now > end_time) { /* there will be no more pages in the time range */
|
||||
break;
|
||||
}
|
||||
if (now < start_time)
|
||||
is_metric_earlier_than_range = 1;
|
||||
if (unlikely(current_position_time < max_time)) /* just went back in time */
|
||||
is_metric_out_of_order = 1;
|
||||
if (is_metric_earlier_than_range || unlikely(is_metric_out_of_order)) {
|
||||
if (unlikely(is_metric_out_of_order))
|
||||
info("Ignoring metric with out of order timestamp.");
|
||||
continue; /* next entry */
|
||||
}
|
||||
/* here is a valid metric */
|
||||
++page_points;
|
||||
region_info_array[regions - 1].points = ++region_points;
|
||||
max_time = current_position_time;
|
||||
if (1 == page_points)
|
||||
first_valid_time_in_page = current_position_time;
|
||||
if (unlikely(!is_first_region_initialized)) {
|
||||
assert(1 == regions);
|
||||
/* this is the first region */
|
||||
region_info_array[0].start_time = current_position_time;
|
||||
is_first_region_initialized = 1;
|
||||
}
|
||||
}
|
||||
*pginfo_to_points(curr) = page_points;
|
||||
if (0 == page_points) {
|
||||
prev = old_prev;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unlikely(0 == dt)) { /* unknown data collection interval */
|
||||
assert(1 == page_points);
|
||||
|
||||
if (likely(NULL != prev)) { /* get interval from previous page */
|
||||
*pginfo_to_dt(curr) = *pginfo_to_dt(prev);
|
||||
} else { /* there is no previous page in the query */
|
||||
struct rrdeng_page_info db_page_info;
|
||||
|
||||
/* go to database */
|
||||
pg_cache_get_filtered_info_prev(ctx, page_index, curr->start_time,
|
||||
metrics_with_known_interval, &db_page_info);
|
||||
if (unlikely(db_page_info.start_time == INVALID_TIME || db_page_info.end_time == INVALID_TIME ||
|
||||
0 == db_page_info.page_length)) { /* nothing in the database, default to update_every */
|
||||
*pginfo_to_dt(curr) = rd->update_every;
|
||||
} else {
|
||||
unsigned db_entries;
|
||||
usec_t db_dt;
|
||||
|
||||
db_entries = db_page_info.page_length / sizeof(storage_number);
|
||||
db_dt = (db_page_info.end_time - db_page_info.start_time) / (db_entries - 1);
|
||||
*pginfo_to_dt(curr) = ROUND_USEC_TO_SEC(db_dt);
|
||||
if (unlikely(0 == *pginfo_to_dt(curr)))
|
||||
*pginfo_to_dt(curr) = 1;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
if (likely(prev) && unlikely(*pginfo_to_dt(curr) != *pginfo_to_dt(prev))) {
|
||||
info("Data collection interval change detected in query: %"PRIu32" -> %"PRIu32,
|
||||
*pginfo_to_dt(prev), *pginfo_to_dt(curr));
|
||||
region_info_array[regions++ - 1].points -= page_points;
|
||||
region_info_array[regions - 1].points = region_points = page_points;
|
||||
region_info_array[regions - 1].start_time = first_valid_time_in_page;
|
||||
}
|
||||
if (*pginfo_to_dt(curr) > max_interval)
|
||||
max_interval = *pginfo_to_dt(curr);
|
||||
region_info_array[regions - 1].update_every = *pginfo_to_dt(curr);
|
||||
}
|
||||
if (page_info_array)
|
||||
freez(page_info_array);
|
||||
if (region_info_array) {
|
||||
if (likely(is_first_region_initialized)) {
|
||||
/* free unnecessary memory */
|
||||
region_info_array = reallocz(region_info_array, sizeof(*region_info_array) * regions);
|
||||
*region_info_arrayp = region_info_array;
|
||||
*max_intervalp = max_interval;
|
||||
} else {
|
||||
/* empty result */
|
||||
freez(region_info_array);
|
||||
}
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets a handle for loading metrics from the database.
|
||||
* The handle must be released with rrdeng_load_metric_final().
|
||||
|
@ -226,80 +428,108 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
|
|||
{
|
||||
struct rrdeng_query_handle *handle;
|
||||
struct rrdengine_instance *ctx;
|
||||
unsigned pages_nr;
|
||||
|
||||
ctx = rd->rrdset->rrdhost->rrdeng_ctx;
|
||||
rrdimm_handle->start_time = start_time;
|
||||
rrdimm_handle->end_time = end_time;
|
||||
handle = &rrdimm_handle->rrdeng;
|
||||
handle->next_page_time = start_time;
|
||||
handle->now = start_time;
|
||||
handle->dt = rd->rrdset->update_every;
|
||||
handle->position = 0;
|
||||
handle->ctx = ctx;
|
||||
handle->descr = NULL;
|
||||
handle->page_index = pg_cache_preload(ctx, rd->state->rrdeng_uuid,
|
||||
start_time * USEC_PER_SEC, end_time * USEC_PER_SEC);
|
||||
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
|
||||
NULL, &handle->page_index);
|
||||
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
|
||||
/* there are no metrics to load */
|
||||
handle->next_page_time = INVALID_TIME;
|
||||
}
|
||||
|
||||
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle)
|
||||
/* Returns the metric and sets its timestamp into current_time */
|
||||
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
|
||||
{
|
||||
struct rrdeng_query_handle *handle;
|
||||
struct rrdengine_instance *ctx;
|
||||
struct rrdeng_page_descr *descr;
|
||||
storage_number *page, ret;
|
||||
unsigned position;
|
||||
usec_t point_in_time;
|
||||
unsigned position, entries;
|
||||
usec_t next_page_time, current_position_time;
|
||||
|
||||
handle = &rrdimm_handle->rrdeng;
|
||||
if (unlikely(INVALID_TIME == handle->now)) {
|
||||
if (unlikely(INVALID_TIME == handle->next_page_time)) {
|
||||
return SN_EMPTY_SLOT;
|
||||
}
|
||||
ctx = handle->ctx;
|
||||
point_in_time = handle->now * USEC_PER_SEC;
|
||||
descr = handle->descr;
|
||||
|
||||
if (unlikely(NULL == handle->page_index)) {
|
||||
ret = SN_EMPTY_SLOT;
|
||||
goto out;
|
||||
if (unlikely(NULL == (descr = handle->descr))) {
|
||||
/* it's the first call */
|
||||
next_page_time = handle->next_page_time * USEC_PER_SEC;
|
||||
}
|
||||
position = handle->position + 1;
|
||||
|
||||
if (unlikely(NULL == descr ||
|
||||
point_in_time < descr->start_time ||
|
||||
point_in_time > descr->end_time)) {
|
||||
position >= (descr->page_length / sizeof(storage_number)))) {
|
||||
/* We need to get a new page */
|
||||
if (descr) {
|
||||
/* Drop old page's reference */
|
||||
handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1;
|
||||
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
|
||||
goto no_more_metrics;
|
||||
}
|
||||
next_page_time = handle->next_page_time * USEC_PER_SEC;
|
||||
#ifdef NETDATA_INTERNAL_CHECKS
|
||||
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
|
||||
#endif
|
||||
pg_cache_put(ctx, descr);
|
||||
handle->descr = NULL;
|
||||
}
|
||||
descr = pg_cache_lookup(ctx, handle->page_index, &handle->page_index->id, point_in_time);
|
||||
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
|
||||
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
|
||||
if (NULL == descr) {
|
||||
ret = SN_EMPTY_SLOT;
|
||||
goto out;
|
||||
goto no_more_metrics;
|
||||
}
|
||||
#ifdef NETDATA_INTERNAL_CHECKS
|
||||
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
|
||||
#endif
|
||||
handle->descr = descr;
|
||||
}
|
||||
if (unlikely(INVALID_TIME == descr->start_time ||
|
||||
INVALID_TIME == descr->end_time)) {
|
||||
ret = SN_EMPTY_SLOT;
|
||||
goto out;
|
||||
if (unlikely(INVALID_TIME == descr->start_time ||
|
||||
INVALID_TIME == descr->end_time)) {
|
||||
goto no_more_metrics;
|
||||
}
|
||||
if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) {
|
||||
/* we're in the middle of the page somewhere */
|
||||
entries = descr->page_length / sizeof(storage_number);
|
||||
position = ((uint64_t)(next_page_time - descr->start_time)) * entries /
|
||||
(descr->end_time - descr->start_time + 1);
|
||||
} else {
|
||||
position = 0;
|
||||
}
|
||||
}
|
||||
page = descr->pg_cache_descr->page;
|
||||
if (unlikely(descr->start_time == descr->end_time)) {
|
||||
ret = page[0];
|
||||
goto out;
|
||||
}
|
||||
position = ((uint64_t)(point_in_time - descr->start_time)) * (descr->page_length / sizeof(storage_number)) /
|
||||
(descr->end_time - descr->start_time + 1);
|
||||
ret = page[position];
|
||||
entries = descr->page_length / sizeof(storage_number);
|
||||
if (entries > 1) {
|
||||
usec_t dt;
|
||||
|
||||
out:
|
||||
handle->now += handle->dt;
|
||||
if (unlikely(handle->now > rrdimm_handle->end_time)) {
|
||||
handle->now = INVALID_TIME;
|
||||
dt = (descr->end_time - descr->start_time) / (entries - 1);
|
||||
current_position_time = descr->start_time + position * dt;
|
||||
} else {
|
||||
current_position_time = descr->start_time;
|
||||
}
|
||||
handle->position = position;
|
||||
handle->now = current_position_time / USEC_PER_SEC;
|
||||
/* assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
|
||||
The above assertion is an approximation and needs to take update_every into account */
|
||||
if (unlikely(handle->now >= rrdimm_handle->end_time)) {
|
||||
/* next calls will not load any more metrics */
|
||||
handle->next_page_time = INVALID_TIME;
|
||||
}
|
||||
*current_time = handle->now;
|
||||
return ret;
|
||||
|
||||
no_more_metrics:
|
||||
handle->next_page_time = INVALID_TIME;
|
||||
return SN_EMPTY_SLOT;
|
||||
}
|
||||
|
||||
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
|
||||
|
@ -307,7 +537,7 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
|
|||
struct rrdeng_query_handle *handle;
|
||||
|
||||
handle = &rrdimm_handle->rrdeng;
|
||||
return (INVALID_TIME == handle->now);
|
||||
return (INVALID_TIME == handle->next_page_time);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -15,6 +15,12 @@
|
|||
extern int default_rrdeng_page_cache_mb;
|
||||
extern int default_rrdeng_disk_quota_mb;
|
||||
|
||||
struct rrdeng_region_info {
|
||||
time_t start_time;
|
||||
int update_every;
|
||||
unsigned points;
|
||||
};
|
||||
|
||||
extern void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr);
|
||||
extern void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
|
||||
Word_t page_correlation_id);
|
||||
|
@ -25,9 +31,12 @@ extern void rrdeng_store_metric_init(RRDDIM *rd);
|
|||
extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
|
||||
extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
|
||||
extern void rrdeng_store_metric_finalize(RRDDIM *rd);
|
||||
extern unsigned
|
||||
rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
|
||||
struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp);
|
||||
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
|
||||
time_t start_time, time_t end_time);
|
||||
extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle);
|
||||
extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time);
|
||||
extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
|
||||
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
|
||||
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
|
||||
|
|
|
@ -23,6 +23,8 @@ struct rrdeng_page_descr;
|
|||
#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
|
||||
#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
|
||||
|
||||
#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC)
|
||||
|
||||
typedef uintptr_t rrdeng_stats_t;
|
||||
|
||||
#ifdef __ATOMIC_RELAXED
|
||||
|
|
|
@ -273,8 +273,9 @@ struct rrddim_query_handle {
|
|||
struct rrdeng_page_descr *descr;
|
||||
struct rrdengine_instance *ctx;
|
||||
struct pg_cache_page_index *page_index;
|
||||
time_t now; //TODO: remove now to implement next point iteration
|
||||
time_t dt; //TODO: remove dt to implement next point iteration
|
||||
time_t next_page_time;
|
||||
time_t now;
|
||||
unsigned position;
|
||||
} rrdeng; // state the database engine uses
|
||||
#endif
|
||||
};
|
||||
|
@ -307,7 +308,7 @@ struct rrddim_volatile {
|
|||
void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
|
||||
|
||||
// run this to load each metric number from the database
|
||||
storage_number (*next_metric)(struct rrddim_query_handle *handle);
|
||||
storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
|
||||
|
||||
// run this to test if the series of next_metric() database queries is finished
|
||||
int (*is_finished)(struct rrddim_query_handle *handle);
|
||||
|
|
|
@ -118,11 +118,12 @@ static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, ti
|
|||
handle->slotted.finished = 0;
|
||||
}
|
||||
|
||||
static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle) {
|
||||
static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) {
|
||||
RRDDIM *rd = handle->rd;
|
||||
long entries = rd->rrdset->entries;
|
||||
long slot = handle->slotted.slot;
|
||||
|
||||
(void)current_time;
|
||||
if (unlikely(handle->slotted.slot == handle->slotted.last_slot))
|
||||
handle->slotted.finished = 1;
|
||||
storage_number n = rd->values[slot++];
|
||||
|
|
|
@ -46,9 +46,12 @@ calculated_number grouping_flush_average(RRDR *r, RRDR_VALUE_FLAGS *rrdr_value_
|
|||
*rrdr_value_options_ptr |= RRDR_VALUE_EMPTY;
|
||||
}
|
||||
else {
|
||||
if(unlikely(r->internal.resampling_group != 1))
|
||||
value = g->sum / r->internal.resampling_divisor;
|
||||
else
|
||||
if(unlikely(r->internal.resampling_group != 1)) {
|
||||
if (unlikely(r->result_options & RRDR_RESULT_OPTION_VARIABLE_STEP))
|
||||
value = g->sum / g->count / r->internal.resampling_divisor;
|
||||
else
|
||||
value = g->sum / r->internal.resampling_divisor;
|
||||
} else
|
||||
value = g->sum / g->count;
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -6,7 +6,7 @@
|
|||
#include "libnetdata/libnetdata.h"
|
||||
|
||||
typedef enum rrdr_options {
|
||||
RRDR_OPTION_NONZERO = 0x00000001, // don't output dimensions will just zero values
|
||||
RRDR_OPTION_NONZERO = 0x00000001, // don't output dimensions with just zero values
|
||||
RRDR_OPTION_REVERSED = 0x00000002, // output the rows in reverse order (oldest to newest)
|
||||
RRDR_OPTION_ABSOLUTE = 0x00000004, // values positive, for DATASOURCE_SSV before summing
|
||||
RRDR_OPTION_MIN2MAX = 0x00000008, // when adding dimensions, use max - min, instead of sum
|
||||
|
@ -18,7 +18,7 @@ typedef enum rrdr_options {
|
|||
RRDR_OPTION_JSON_WRAP = 0x00000200, // wrap the response in a JSON header with info about the result
|
||||
RRDR_OPTION_LABEL_QUOTES = 0x00000400, // in CSV output, wrap header labels in double quotes
|
||||
RRDR_OPTION_PERCENTAGE = 0x00000800, // give values as percentage of total
|
||||
RRDR_OPTION_NOT_ALIGNED = 0x00001000, // do not align charts for persistant timeframes
|
||||
RRDR_OPTION_NOT_ALIGNED = 0x00001000, // do not align charts for persistent timeframes
|
||||
RRDR_OPTION_DISPLAY_ABS = 0x00002000, // for badges, display the absolute value, but calculate colors with sign
|
||||
RRDR_OPTION_MATCH_IDS = 0x00004000, // when filtering dimensions, match only IDs
|
||||
RRDR_OPTION_MATCH_NAMES = 0x00008000, // when filtering dimensions, match only names
|
||||
|
@ -40,8 +40,11 @@ typedef enum rrdr_dimension_flag {
|
|||
|
||||
// RRDR result options
|
||||
typedef enum rrdr_result_flags {
|
||||
RRDR_RESULT_OPTION_ABSOLUTE = 0x00000001, // the query uses absolute time-frames (can be cached by browsers and proxies)
|
||||
RRDR_RESULT_OPTION_RELATIVE = 0x00000002, // the query uses relative time-frames (should not to be cached by browsers and proxies)
|
||||
RRDR_RESULT_OPTION_ABSOLUTE = 0x00000001, // the query uses absolute time-frames
|
||||
// (can be cached by browsers and proxies)
|
||||
RRDR_RESULT_OPTION_RELATIVE = 0x00000002, // the query uses relative time-frames
|
||||
// (should not to be cached by browsers and proxies)
|
||||
RRDR_RESULT_OPTION_VARIABLE_STEP = 0x00000004, // the query uses variable-step time-frames
|
||||
} RRDR_RESULT_FLAGS;
|
||||
|
||||
typedef struct rrdresult {
|
||||
|
|
Loading…
Add table
Reference in a new issue