0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-28 14:42:31 +00:00

Configurable storage engine for Netdata agents: step 1 ()

* rrd: move API structures out of rrddim_volatile

In C, unlike C++, it's not possible to reference a nested structure
from outside this structure.

Since we later want to use rrddim_query_ops and rrddim_collect_ops
separately from rrddim_volatile, move these nested structures out.

* rrd: use opaque handle types for different memory modes
This commit is contained in:
Adrien Béraud 2022-05-03 04:34:15 -04:00 committed by GitHub
parent 5850810715
commit d92890b5f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 110 deletions

View file

@ -34,6 +34,22 @@ struct rrdengine_instance;
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
struct rrdeng_collect_handle {
struct rrdeng_page_descr *descr, *prev_descr;
unsigned long page_correlation_id;
struct rrdengine_instance *ctx;
// set to 1 when this dimension is not page aligned with the other dimensions in the chart
uint8_t unaligned_page;
};
struct rrdeng_query_handle {
struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
time_t next_page_time;
time_t now;
unsigned position;
};
typedef enum {
RRDENGINE_STATUS_UNINITIALIZED = 0,

View file

@ -126,12 +126,13 @@ void rrdeng_store_metric_init(RRDDIM *rd)
struct pg_cache_page_index *page_index;
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
handle = &rd->state->handle.rrdeng;
handle->ctx = ctx;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
handle->ctx = ctx;
handle->descr = NULL;
handle->prev_descr = NULL;
handle->unaligned_page = 0;
rd->state->handle = (STORAGE_COLLECT_HANDLE *)handle;
page_index = rd->state->page_index;
uv_rwlock_wrlock(&page_index->lock);
@ -162,7 +163,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
handle = &rd->state->handle.rrdeng;
handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
if (unlikely(!ctx))
return;
@ -211,14 +212,13 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
{
struct rrdeng_collect_handle *handle;
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
struct rrdeng_page_descr *descr;
storage_number *page;
uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
handle = &rd->state->handle.rrdeng;
ctx = handle->ctx;
pg_cache = &ctx->pg_cache;
descr = handle->descr;
@ -301,7 +301,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
struct pg_cache_page_index *page_index;
uint8_t can_delete_metric = 0;
handle = &rd->state->handle.rrdeng;
handle = (struct rrdeng_collect_handle *)rd->state->handle;
ctx = handle->ctx;
page_index = rd->state->page_index;
rrdeng_store_metric_flush_current_page(rd);
@ -314,6 +314,7 @@ int rrdeng_store_metric_finalize(RRDDIM *rd)
can_delete_metric = 1;
}
uv_rwlock_wrunlock(&page_index->lock);
freez(handle);
return can_delete_metric;
}
@ -535,12 +536,14 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
rrdimm_handle->start_time = start_time;
rrdimm_handle->end_time = end_time;
handle = &rrdimm_handle->rrdeng;
handle = calloc(1, sizeof(struct rrdeng_query_handle));
handle->next_page_time = start_time;
handle->now = start_time;
handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
@ -551,7 +554,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
/* Returns the metric and sets its timestamp into current_time */
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
{
struct rrdeng_query_handle *handle;
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
storage_number *page, ret;
@ -559,7 +562,6 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
usec_t next_page_time = 0, current_position_time, page_end_time = 0;
uint32_t page_length;
handle = &rrdimm_handle->rrdeng;
if (unlikely(INVALID_TIME == handle->next_page_time)) {
return SN_EMPTY_SLOT;
}
@ -641,9 +643,7 @@ no_more_metrics:
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
handle = &rrdimm_handle->rrdeng;
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
return (INVALID_TIME == handle->next_page_time);
}
@ -652,13 +652,10 @@ int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
*/
void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
{
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
struct rrdeng_page_descr *descr;
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
struct rrdengine_instance *ctx = handle->ctx;
struct rrdeng_page_descr *descr = handle->descr;
handle = &rrdimm_handle->rrdeng;
ctx = handle->ctx;
descr = handle->descr;
if (descr) {
#ifdef NETDATA_INTERNAL_CHECKS
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);

View file

@ -326,53 +326,56 @@ struct rrddim {
};
// ----------------------------------------------------------------------------
// iterator state for RRD dimension data collection
union rrddim_collect_handle {
struct {
long slot;
long entries;
} slotted; // state the legacy code uses
#ifdef ENABLE_DBENGINE
struct rrdeng_collect_handle {
struct rrdeng_page_descr *descr, *prev_descr;
unsigned long page_correlation_id;
struct rrdengine_instance *ctx;
// set to 1 when this dimension is not page aligned with the other dimensions in the chart
uint8_t unaligned_page;
} rrdeng; // state the database engine uses
#endif
};
// engine-specific iterator state for dimension data collection
typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE;
// ----------------------------------------------------------------------------
// engine-specific iterator state for dimension data queries
typedef struct storage_query_handle STORAGE_QUERY_HANDLE;
// ----------------------------------------------------------------------------
// iterator state for RRD dimension data queries
#ifdef ENABLE_DBENGINE
struct rrdeng_query_handle {
struct rrdeng_page_descr *descr;
struct rrdengine_instance *ctx;
struct pg_cache_page_index *page_index;
time_t next_page_time;
time_t now;
unsigned position;
};
#endif
struct rrddim_query_handle {
RRDDIM *rd;
time_t start_time;
time_t end_time;
union {
struct {
long slot;
long last_slot;
uint8_t finished;
} slotted; // state the legacy code uses
#ifdef ENABLE_DBENGINE
struct rrdeng_query_handle rrdeng; // state the database engine uses
#endif
};
STORAGE_QUERY_HANDLE* handle;
};
// ------------------------------------------------------------------------
// function pointers that handle data collection
struct rrddim_collect_ops {
// an initialization function to run before starting collection
void (*init)(RRDDIM *rd);
// run this to store each metric into the database
void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
// an finalization function to run after collection is over
// returns 1 if it's safe to delete the dimension
int (*finalize)(RRDDIM *rd);
};
// function pointers that handle database queries
struct rrddim_query_ops {
// run this before starting a series of next_metric() database queries
void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
// run this to load each metric number from the database
storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
// run this to test if the series of next_metric() database queries is finished
int (*is_finished)(struct rrddim_query_handle *handle);
// run this after finishing a series of load_metric() database queries
void (*finalize)(struct rrddim_query_handle *handle);
// get the timestamp of the last entry of this metric
time_t (*latest_time)(RRDDIM *rd);
// get the timestamp of the first entry of this metric
time_t (*oldest_time)(RRDDIM *rd);
};
// ----------------------------------------------------------------------------
// volatile state per RRD dimension
@ -385,42 +388,9 @@ struct rrddim_volatile {
int aclk_live_status;
#endif
uuid_t metric_uuid; // global UUID for this metric (unique_across hosts)
union rrddim_collect_handle handle;
// ------------------------------------------------------------------------
// function pointers that handle data collection
struct rrddim_collect_ops {
// an initialization function to run before starting collection
void (*init)(RRDDIM *rd);
// run this to store each metric into the database
void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
// an finalization function to run after collection is over
// returns 1 if it's safe to delete the dimension
int (*finalize)(RRDDIM *rd);
} collect_ops;
// function pointers that handle database queries
struct rrddim_query_ops {
// run this before starting a series of next_metric() database queries
void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
// run this to load each metric number from the database
storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
// run this to test if the series of next_metric() database queries is finished
int (*is_finished)(struct rrddim_query_handle *handle);
// run this after finishing a series of load_metric() database queries
void (*finalize)(struct rrddim_query_handle *handle);
// get the timestamp of the last entry of this metric
time_t (*latest_time)(RRDDIM *rd);
// get the timestamp of the first entry of this metric
time_t (*oldest_time)(RRDDIM *rd);
} query_ops;
STORAGE_COLLECT_HANDLE* handle;
struct rrddim_collect_ops collect_ops;
struct rrddim_query_ops query_ops;
ml_dimension_t ml_dimension;
};
@ -435,6 +405,19 @@ struct rrdset_volatile {
bool is_ar_chart;
};
// RRDDIM legacy data collection structures
struct mem_collect_handle {
long slot;
long entries;
};
struct mem_query_handle {
long slot;
long last_slot;
uint8_t finished;
};
// ----------------------------------------------------------------------------
// these loop macros make sure the linked list is accessed with the right lock

View file

@ -99,6 +99,7 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
// RRDDIM legacy data collection functions
static void rrddim_collect_init(RRDDIM *rd) {
rd->state->handle = callocz(1, sizeof(struct mem_collect_handle));
rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
}
static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) {
@ -107,11 +108,11 @@ static void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storag
rd->values[rd->rrdset->current_entry] = number;
}
static int rrddim_collect_finalize(RRDDIM *rd) {
(void)rd;
freez(rd->state->handle);
return 0;
}
// ----------------------------------------------------------------------------
// RRDDIM legacy database query functions
@ -119,34 +120,37 @@ static void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, ti
handle->rd = rd;
handle->start_time = start_time;
handle->end_time = end_time;
handle->slotted.slot = rrdset_time2slot(rd->rrdset, start_time);
handle->slotted.last_slot = rrdset_time2slot(rd->rrdset, end_time);
handle->slotted.finished = 0;
struct mem_query_handle* mem_handle = callocz(1, sizeof(struct mem_query_handle));
mem_handle->slot = rrdset_time2slot(rd->rrdset, start_time);
mem_handle->last_slot = rrdset_time2slot(rd->rrdset, end_time);
mem_handle->finished = 0;
handle->handle = (STORAGE_QUERY_HANDLE *)mem_handle;
}
static storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) {
RRDDIM *rd = handle->rd;
struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle;
long entries = rd->rrdset->entries;
long slot = handle->slotted.slot;
long slot = mem_handle->slot;
(void)current_time;
if (unlikely(handle->slotted.slot == handle->slotted.last_slot))
handle->slotted.finished = 1;
if (unlikely(mem_handle->slot == mem_handle->last_slot))
mem_handle->finished = 1;
storage_number n = rd->values[slot++];
if(unlikely(slot >= entries)) slot = 0;
handle->slotted.slot = slot;
mem_handle->slot = slot;
return n;
}
static int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
return handle->slotted.finished;
struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle->handle;
return mem_handle->finished;
}
static void rrddim_query_finalize(struct rrddim_query_handle *handle) {
(void)handle;
freez(handle->handle);
return;
}

View file

@ -45,7 +45,7 @@ private:
RRDDIM *RD;
RRDDIM *AnomalyRateRD;
struct rrddim_volatile::rrddim_query_ops *Ops;
struct rrddim_query_ops *Ops;
std::string ID;
};

View file

@ -40,7 +40,7 @@ public:
private:
RRDDIM *RD;
struct rrddim_volatile::rrddim_query_ops *Ops;
struct rrddim_query_ops *Ops;
struct rrddim_query_handle Handle;
};

View file

@ -580,9 +580,10 @@ static inline void do_dimension_fixedstep(
// read the value from the database
//storage_number n = rd->values[slot];
#ifdef NETDATA_INTERNAL_CHECKS
struct mem_query_handle* mem_handle = (struct mem_query_handle*)handle.handle;
if ((rd->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) &&
(rrdset_time2slot(st, now) != (long unsigned)handle.slotted.slot)) {
error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)handle.slotted.slot, rrdset_time2slot(st, now));
(rrdset_time2slot(st, now) != (long unsigned)(mem_handle->slot))) {
error("INTERNAL CHECK: Unaligned query for %s, database slot: %lu, expected slot: %lu", rd->id, (long unsigned)mem_handle->slot, rrdset_time2slot(st, now));
}
#endif
db_now = now; // this is needed to set db_now in case the next_metric implementation does not set it
@ -601,8 +602,9 @@ static inline void do_dimension_fixedstep(
calculated_number value = NAN;
if(likely(now >= db_now && does_storage_number_exist(n))) {
#if defined(NETDATA_INTERNAL_CHECKS) && defined(ENABLE_DBENGINE)
if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != handle.rrdeng.now)) {
error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now);
struct rrdeng_query_handle* rrd_handle = (struct rrdeng_query_handle*)handle.handle;
if ((rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && (now != rrd_handle->now)) {
error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)rrd_handle->now, (long)now);
}
#endif
if (options & RRDR_OPTION_ANOMALY_BIT)