mirror of
https://github.com/netdata/netdata.git
synced 2025-04-13 09:11:50 +00:00

* recreate the circular buffer from time to time * do not update cloud url if the node id is not updated * remove deadlock and optimize pipe size * removed const * finer control on randomized delays * restore children re-connecting to parents * handle partial pipe reads; sender_commit() now checks if the sender is still connected to avoid bombarding it with data that cannot be sent * added commented code about optimizing the array of pollfds * improve interactivity of sender; code cleanup * do not use the pipe for sending messages, instead use a queue in memory (that can never be full) * fix dictionaries families * do not destroy aral on replication exit - it crashes the senders * support multiple dispatchers and connectors; code cleanup * more cleanup * Add serde support for KMeans models. - Serialization/Deserialization support of KMeans models. - Send/receive ML models between a child/parent. - Fix some rare and old crash reports. - Reduce allocations by a couple thousand per second when training. - Enable ML statistics temporarily which might increase CPU consumption. * fix ml models streaming * up to 10 dispatchers and 2 connectors * experiment: limit the number of receivers to the number of cores - 2 * reworked compression at the receiver to minimize read operations * multi-core receivers * use slot 0 on receivers * use slot 0 on receivers * use half the cores for receivers with a minimum of 4 * cancel receiver threads * use offsets instead of pointers in the compressed buffer; track last reads * fix crash on using freed decompressor; core re-org * fix incorrect job registration * fix send_to_plugin() for SSL * add reason to disconnect message * fix signaling receivers to stop * added --dev option to netdata-installer.sh to prevent it from removing the build directory * Fix serde of double values. NaNs and +/- infinities are encoded as strings. * unused param * reset max cbuffer size when it is recreated * struct receiver_state is now private * 1 dispatcher, 1 connector, 2/3 cores for receivers * all replication requests are served by replication threads - never the dispatcher threads * optimize partitions and cache lines for dbengine cache * fix crash on receiver shutdown * rw spinlock now prioritizes writers * backfill all higher tiers * extent cache to 10% * automatic sizing of replication threads * add more replication threads * configure cache eviction parameters to avoid running in aggressive mode all the time * run evictions and flushes every 100ms * add missing initialization * add missing initialization - again * add evictors for all caches * add dedicated evict thread per cache * destroy the completion * avoid sending too many signals to eviction threads * alternative way to make sure there are data to evict * measure inline cache events * disable inline evictions and flushing for open and extent cache * use a spinlock to avoid sending too many signals * batch evictions are not in steps of pages * fix wanted cache size when there are no clean entries in it * fix wanted cache size when there are no clean entries in it * fix wanted cache size again * adaptive batch evictions; batch evictions first try all partitions * move waste events to waste chart * added evict_traversed * evict is smaller steps * removed obsolete code * disabled inlining of evictions and flushing; added timings for evictions * more detailed timings for evictions * use inline evictors * use aral for gorilla pages of 512 bytes, when they are loaded from disk * use aral for all gorilla page sizes loaded from disk * disable inlining again to test it after the memory optimization * timings for dbengine evictions * added timing names * detailed timings * detailed timings - again * removed timings and restored inline evictions * eviction on release only under critical pressure * cleanup and replication tuning * tune cache size calculation * tune replication threads calculation * make streaming receiver exit * Do not allocate/copy extent data twice. * Build/link mimalloc Just for testing, it will be reverted. * lower memory requirements * Link mimalloc statically * run replication with synchronous queries * added missing worker jobs in sender dispatcher * enable batch evictions in pgc * fix sender-dispatcher workers * set max dispatchers to 2 * increase the default replication threads * log stream_info errors * increase replication threads * log the json text when we fail to parse json response of stream_info * stream info response may come back in multiple steps * print the socket error of stream info * added debug to stream info socket error * loop while content-length is smaller than the payload received * Revert "Link mimalloc statically" This reverts commitc98e482d47
. * Revert "Build/link mimalloc" This reverts commit8aae22a28a
. * Remove NEED_PROTOBUF * Use mimalloc * Revert "Use mimalloc" This reverts commit9a68034786
. * Use mimalloc * support 256 bytes gorilla pages, when they are loaded from disk * added os_mem_available() * test memory protection * use protection only on one cache * use the free memory of the main cache in the other caches too * use the free memory of the main cache in the open cache too * Batch gorilla writes by tracking the last written number. In a setup with 200 children, `perf` shows that the worst offender is the gorilla write operation, reporting ~17% overhead. With this change `perf` reports ~4% overhead and netdata's CPU consumption decreased by ~16%. * make buffered_reader_next_line() a couple times faster * flushing open cache * Use re2c for the line splitting pluginsd. Function get's optimized around 3x. We should delete old code and use the re2c for the rest of the functions, but we need to keep the PR size as minimal as possible. Will do in follow up PRs. * use cores - 1 for receivers, use only 1 sender * move sender processing to a separate function * Revert "Batch gorilla writes by tracking the last written number." This reverts commit2e72a5c56d
. * Batch gorilla writes only from writers This reapplies df79be2f01145bd79091a8934d7c80b4b3eb915b and introduces a couple changes to remomove writes from readers. * log information for buffer overflow * fix heap use after free * added comments to the main stream receiver loop * 3 dispatchers * single threaded receiver and sender * code cleanup * de-associate hosts from streaming threads when both the receiver and sender stop, so that each time the threads are re-balanced * fix heap use after free * properly get the slot number of pollfd * fixes * fixes * revert worker changes * reuse streaming threads * backfilling should be synchronous * remove the node last * do not keep a pointer to rellocatable buffer * give to pgc the right page size, not less * restore spreading metrics size across time * use the calculated slots for gorilla pages * accurately track gorilla page size changes * check the sth pointer for validity * code cleanup, files re-org and renames to reflect the new structure of streaming * updated referenced size when the size of a page changes; removed flush spins - fluhses cancelled is a waste event * improve families in netdata statistics * page size histogram per cache * page size histogram per cache queue (hot, dirty, clean) * fix heap after use in pdc.c * rw_spinlocks: when preferring a writer yield so that the writer has the chance to get the lock * do not balloon open and extent caches more than needed (it fragments memory and there is not enough memory for the main cache) * fixed typo * enable trace allocations to work * Skip adding kmeans model when ML dimension has not been created. * PGD is now entirely on ARAL for all types of pages * 2 partitions for PGD * Check for ML queue prior to pushing as well. * merge multiple arals, to avoid wasting memory * significantly less arals; proper calculation of gorilla efficiency * report pgd buffers separately from pgc * aral only for sizes less than 512 bytes * tune aral caches * log the functions using the streaming buffer when concurrent use is detected * aral supporting different pages for collected pages and clean pages - an attempt to minimize fragmentation at high performance * fix misuse of sender thread buffers * select the right buffer, based on the receiver tid * no more rrdpush, renamed to stream * lower aral max page size to 16KiB - in an attempt to lower fragmentation under memory pressure * update opcode handling * automatic sizing of aral limiting its size to 200 items per page or 4 x system pages * tune cache eviction strategy * renamed global statistics to telemetry and split it into multiple files * left over renames of global statistics to telemetry * added heatmap to chart types * note about re-balancing a parents cluster * fix formating * added aral telemetry to find the fragmentation per aral * experiment with a different strategy when making clean pages: always append so that the cache is being constantly rotated; aral telemetry reports utilization instead of fragmentation * aral now takes into account waiting deallocators when it creates new pages * split netdata-conf functions into multiple files; added dbengine use all caches and dbengine out of memory protection settings * tune cache eviction strategy * cache parameters cleanup * rename mem_available to system_memory * Fix variable type. * Add fuzzer for pluginsd line splitter. * use cgroup v1 and v2 to detect memory protection; log on start the detection of memory * fixed typo * added logs about system memory detection * remove debug logs from system memory detection * move the rest of dbengine config to netdata-conf * respect streaming buffer size configured * add workers to pgc eviction threads * renamed worker * fixed flip-flop in size and entries conversions * use aral_by_size when we actually agreegate stats to aral by size * use keyword defintions * move opcode definitions to stream-thread.h * swap struct pollfd slots to make sure all the sockets have an equal chance of being processed * Revert "Add fuzzer for pluginsd line splitter." This reverts commit454cbcf6e1
. * Revert "Use re2c for the line splitting pluginsd." This reverts commit2b2f9d3887
. * stream thread use judy arrays instead of linked lists and pre-allocated arrays * added comment about pfd structure on sender and receiver * fixed logs and made the defaut sender timeout 5 seconds * Spawn ML worker threads based on number of CPUs. * Add statistics for ML allocations/deallocations. * Add host flag to check for pending alert transitions to save Remove precompiled statements Offload processing of alerts in the event loop Queue alert transitions to the metadata event loop to be saved Run metadata checks every 5 seconds * do not block doing socket retries when errno indicates EWOULDBLOCK; insist sending data in send_to_plugin() * Revert "Add host flag to check for pending alert transitions to save" This reverts commit86ade0e87e
. * fix error reasons * Disable ML memory statistics when using mimalloc * add reason when ml cannot acquire the dimension * added ML memory and depending on the DICT_WITH_STATS define, add aral by size too * do not stream ML when the parent does not have ML enabled * nd_poll() to overcome the starvation of poll() and use epoll() under Linux * nd_poll() optimization to minimize the number of system calls * nd_poll() fix * nd_poll() fix again * make glibc release memory to the system when the system is critical in memory * try bigger aral pages, to enable releasing memory back to the system * Queue alert transitions to the metadata event loop (global list not per host) Add host count to check for pending alert transitions to save Remove precompiled statements Offload processing of alerts in the event loop Run metadata checks every 5 seconds * round robin aral allocations * fix aral round robin * ask glibc to release memory when the allocations are aggressive * tinysleep yields the processor instead of waiting * run malloc_trim() more frequently * Add reference count on alarm_entry * selective tinysleep and processor yielding * revert gorilla batch writes * codacy fixes --------- Co-authored-by: vkalintiris <vasilis@netdata.cloud> Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
2145 lines
81 KiB
C
2145 lines
81 KiB
C
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
#define NETDATA_RRD_INTERNALS
|
|
|
|
#include "rrdengine.h"
|
|
#include "pdc.h"
|
|
#include "dbengine-compression.h"
|
|
|
|
rrdeng_stats_t global_io_errors = 0;
|
|
rrdeng_stats_t global_fs_errors = 0;
|
|
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
|
|
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
|
|
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
|
|
|
|
unsigned rrdeng_pages_per_extent = DEFAULT_PAGES_PER_EXTENT;
|
|
|
|
#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)
|
|
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
|
|
#endif
|
|
|
|
struct rrdeng_cmd {
|
|
struct rrdengine_instance *ctx;
|
|
enum rrdeng_opcode opcode;
|
|
void *data;
|
|
struct completion *completion;
|
|
enum storage_priority priority;
|
|
dequeue_callback_t dequeue_cb;
|
|
|
|
struct {
|
|
struct rrdeng_cmd *prev;
|
|
struct rrdeng_cmd *next;
|
|
} queue;
|
|
};
|
|
|
|
static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker);
|
|
static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker);
|
|
static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker);
|
|
|
|
struct rrdeng_main {
|
|
uv_thread_t thread;
|
|
uv_loop_t loop;
|
|
uv_async_t async;
|
|
uv_timer_t timer;
|
|
uv_timer_t retention_timer;
|
|
pid_t tid;
|
|
bool shutdown;
|
|
|
|
size_t flushes_running;
|
|
size_t evict_main_running;
|
|
size_t evict_open_running;
|
|
size_t evict_extent_running;
|
|
size_t cleanup_running;
|
|
|
|
struct {
|
|
ARAL *ar;
|
|
|
|
struct {
|
|
SPINLOCK spinlock;
|
|
|
|
size_t waiting;
|
|
struct rrdeng_cmd *waiting_items_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
|
|
size_t executed_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
|
|
} unsafe;
|
|
} cmd_queue;
|
|
|
|
struct {
|
|
ARAL *ar;
|
|
|
|
struct {
|
|
size_t dispatched;
|
|
size_t executing;
|
|
} atomics;
|
|
} work_cmd;
|
|
|
|
struct {
|
|
ARAL *ar;
|
|
} handles;
|
|
|
|
struct {
|
|
ARAL *ar;
|
|
} descriptors;
|
|
|
|
struct {
|
|
ARAL *ar;
|
|
} xt_io_descr;
|
|
|
|
} rrdeng_main = {
|
|
.thread = 0,
|
|
.loop = {},
|
|
.async = {},
|
|
.timer = {},
|
|
.retention_timer = {},
|
|
.flushes_running = 0,
|
|
.evict_main_running = 0,
|
|
.cleanup_running = 0,
|
|
|
|
.cmd_queue = {
|
|
.unsafe = {
|
|
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
|
|
},
|
|
}
|
|
};
|
|
|
|
static void sanity_check(void)
|
|
{
|
|
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2));
|
|
|
|
/* Magic numbers must fit in the super-blocks */
|
|
BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
|
|
BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
|
|
|
|
/* Version strings must fit in the super-blocks */
|
|
BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
|
|
BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
|
|
|
|
/* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
|
|
BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
|
|
|
|
BUILD_BUG_ON(sizeof(nd_uuid_t) != UUID_SZ); /* check UUID size */
|
|
|
|
/* page count must fit in 8 bits */
|
|
BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// work request cache
|
|
|
|
typedef void *(*work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req);
|
|
typedef void (*after_work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req, int status);
|
|
|
|
struct rrdeng_work {
|
|
uv_work_t req;
|
|
|
|
struct rrdengine_instance *ctx;
|
|
void *data;
|
|
struct completion *completion;
|
|
|
|
work_cb work_cb;
|
|
after_work_cb after_work_cb;
|
|
enum rrdeng_opcode opcode;
|
|
};
|
|
|
|
static void work_request_init(void) {
|
|
rrdeng_main.work_cmd.ar = aral_create(
|
|
"dbengine-work-cmd",
|
|
sizeof(struct rrdeng_work),
|
|
0,
|
|
0,
|
|
NULL,
|
|
NULL, NULL, false, false
|
|
);
|
|
|
|
telemetry_aral_register(rrdeng_main.work_cmd.ar, "workers");
|
|
}
|
|
|
|
enum LIBUV_WORKERS_STATUS {
|
|
LIBUV_WORKERS_RELAXED,
|
|
LIBUV_WORKERS_STRESSED,
|
|
LIBUV_WORKERS_CRITICAL,
|
|
};
|
|
|
|
static inline enum LIBUV_WORKERS_STATUS work_request_full(void) {
|
|
size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED);
|
|
|
|
if(dispatched >= (size_t)(libuv_worker_threads))
|
|
return LIBUV_WORKERS_CRITICAL;
|
|
|
|
else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS))
|
|
return LIBUV_WORKERS_STRESSED;
|
|
|
|
return LIBUV_WORKERS_RELAXED;
|
|
}
|
|
|
|
static inline void work_done(struct rrdeng_work *work_request) {
|
|
aral_freez(rrdeng_main.work_cmd.ar, work_request);
|
|
}
|
|
|
|
static void work_standard_worker(uv_work_t *req) {
|
|
__atomic_add_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
|
|
|
|
register_libuv_worker_jobs();
|
|
worker_is_busy(UV_EVENT_WORKER_INIT);
|
|
|
|
struct rrdeng_work *work_request = req->data;
|
|
|
|
work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
|
|
worker_is_idle();
|
|
|
|
if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) {
|
|
internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted");
|
|
|
|
while(1) {
|
|
struct rrdeng_cmd cmd = rrdeng_deq_cmd(true);
|
|
if (cmd.opcode == RRDENG_OPCODE_NOOP)
|
|
break;
|
|
|
|
worker_is_busy(UV_EVENT_WORKER_INIT);
|
|
switch (cmd.opcode) {
|
|
case RRDENG_OPCODE_EXTENT_READ:
|
|
worker_dispatch_extent_read(cmd, true);
|
|
break;
|
|
|
|
case RRDENG_OPCODE_QUERY:
|
|
worker_dispatch_query_prep(cmd, true);
|
|
break;
|
|
|
|
default:
|
|
fatal("DBENGINE: Opcode should not be executed synchronously");
|
|
break;
|
|
}
|
|
worker_is_idle();
|
|
}
|
|
}
|
|
|
|
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
|
|
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
|
|
|
|
// signal the event loop a worker is available
|
|
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
|
|
}
|
|
|
|
static void after_work_standard_callback(uv_work_t* req, int status) {
|
|
struct rrdeng_work *work_request = req->data;
|
|
|
|
worker_is_busy(RRDENG_OPCODE_MAX + work_request->opcode);
|
|
|
|
if(work_request->after_work_cb)
|
|
work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
|
|
|
|
work_done(work_request);
|
|
|
|
worker_is_idle();
|
|
}
|
|
|
|
static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) {
|
|
struct rrdeng_work *work_request = NULL;
|
|
|
|
internal_fatal(rrdeng_main.tid != gettid_cached(), "work_dispatch() can only be run from the event loop thread");
|
|
|
|
work_request = aral_mallocz(rrdeng_main.work_cmd.ar);
|
|
memset(work_request, 0, sizeof(struct rrdeng_work));
|
|
work_request->req.data = work_request;
|
|
work_request->ctx = ctx;
|
|
work_request->data = data;
|
|
work_request->completion = completion;
|
|
work_request->work_cb = do_work_cb;
|
|
work_request->after_work_cb = do_after_work_cb;
|
|
work_request->opcode = opcode;
|
|
|
|
if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
|
|
internal_fatal(true, "DBENGINE: cannot queue work");
|
|
work_done(work_request);
|
|
return false;
|
|
}
|
|
|
|
__atomic_add_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
|
|
|
|
return true;
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// page descriptor cache
|
|
|
|
void page_descriptors_init(void) {
|
|
rrdeng_main.descriptors.ar = aral_create(
|
|
"dbengine-descriptors",
|
|
sizeof(struct page_descr_with_data),
|
|
0,
|
|
0,
|
|
NULL,
|
|
NULL, NULL, false, false);
|
|
|
|
telemetry_aral_register(rrdeng_main.xt_io_descr.ar, "descriptors");
|
|
}
|
|
|
|
struct page_descr_with_data *page_descriptor_get(void) {
|
|
struct page_descr_with_data *descr = aral_mallocz(rrdeng_main.descriptors.ar);
|
|
memset(descr, 0, sizeof(struct page_descr_with_data));
|
|
return descr;
|
|
}
|
|
|
|
static inline void page_descriptor_release(struct page_descr_with_data *descr) {
|
|
aral_freez(rrdeng_main.descriptors.ar, descr);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// extent io descriptor cache
|
|
|
|
static void extent_io_descriptor_init(void) {
|
|
rrdeng_main.xt_io_descr.ar = aral_create(
|
|
"dbengine-extent-io",
|
|
sizeof(struct extent_io_descriptor),
|
|
0,
|
|
0,
|
|
NULL,
|
|
NULL, NULL, false, false
|
|
);
|
|
|
|
telemetry_aral_register(rrdeng_main.xt_io_descr.ar, "extent io");
|
|
}
|
|
|
|
static struct extent_io_descriptor *extent_io_descriptor_get(void) {
|
|
struct extent_io_descriptor *xt_io_descr = aral_mallocz(rrdeng_main.xt_io_descr.ar);
|
|
memset(xt_io_descr, 0, sizeof(struct extent_io_descriptor));
|
|
return xt_io_descr;
|
|
}
|
|
|
|
static inline void extent_io_descriptor_release(struct extent_io_descriptor *xt_io_descr) {
|
|
aral_freez(rrdeng_main.xt_io_descr.ar, xt_io_descr);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// query handle cache
|
|
|
|
void rrdeng_query_handle_init(void) {
|
|
rrdeng_main.handles.ar = aral_create(
|
|
"dbengine-query-handles",
|
|
sizeof(struct rrdeng_query_handle),
|
|
0,
|
|
0,
|
|
NULL,
|
|
NULL, NULL, false, false);
|
|
|
|
telemetry_aral_register(rrdeng_main.handles.ar, "query handles");
|
|
}
|
|
|
|
struct rrdeng_query_handle *rrdeng_query_handle_get(void) {
|
|
struct rrdeng_query_handle *handle = aral_mallocz(rrdeng_main.handles.ar);
|
|
memset(handle, 0, sizeof(struct rrdeng_query_handle));
|
|
return handle;
|
|
}
|
|
|
|
void rrdeng_query_handle_release(struct rrdeng_query_handle *handle) {
|
|
aral_freez(rrdeng_main.handles.ar, handle);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// WAL cache
|
|
|
|
static struct {
|
|
struct {
|
|
SPINLOCK spinlock;
|
|
WAL *available_items;
|
|
size_t available;
|
|
} protected;
|
|
|
|
struct {
|
|
size_t allocated;
|
|
} atomics;
|
|
} wal_globals = {
|
|
.protected = {
|
|
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
|
|
.available_items = NULL,
|
|
.available = 0,
|
|
},
|
|
.atomics = {
|
|
.allocated = 0,
|
|
},
|
|
};
|
|
|
|
static void wal_cleanup1(void) {
|
|
WAL *wal = NULL;
|
|
|
|
if(!spinlock_trylock(&wal_globals.protected.spinlock))
|
|
return;
|
|
|
|
if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) {
|
|
wal = wal_globals.protected.available_items;
|
|
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
|
|
wal_globals.protected.available--;
|
|
}
|
|
|
|
spinlock_unlock(&wal_globals.protected.spinlock);
|
|
|
|
if(wal) {
|
|
posix_memfree(wal->buf);
|
|
freez(wal);
|
|
__atomic_sub_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
|
|
}
|
|
}
|
|
|
|
WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
|
|
if(!size || size > RRDENG_BLOCK_SIZE)
|
|
fatal("DBENGINE: invalid WAL size requested");
|
|
|
|
WAL *wal = NULL;
|
|
|
|
spinlock_lock(&wal_globals.protected.spinlock);
|
|
|
|
if(likely(wal_globals.protected.available_items)) {
|
|
wal = wal_globals.protected.available_items;
|
|
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
|
|
wal_globals.protected.available--;
|
|
}
|
|
|
|
uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED);
|
|
spinlock_unlock(&wal_globals.protected.spinlock);
|
|
|
|
if(unlikely(!wal)) {
|
|
wal = mallocz(sizeof(WAL));
|
|
wal->buf_size = RRDENG_BLOCK_SIZE;
|
|
int ret = posix_memalign((void *)&wal->buf, RRDFILE_ALIGNMENT, wal->buf_size);
|
|
if (unlikely(ret))
|
|
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
|
|
__atomic_add_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
// these need to survive
|
|
unsigned buf_size = wal->buf_size;
|
|
void *buf = wal->buf;
|
|
|
|
memset(wal, 0, sizeof(WAL));
|
|
|
|
// put them back
|
|
wal->buf_size = buf_size;
|
|
wal->buf = buf;
|
|
|
|
memset(wal->buf, 0, wal->buf_size);
|
|
|
|
wal->transaction_id = transaction_id;
|
|
wal->size = size;
|
|
|
|
return wal;
|
|
}
|
|
|
|
void wal_release(WAL *wal) {
|
|
if(unlikely(!wal)) return;
|
|
|
|
spinlock_lock(&wal_globals.protected.spinlock);
|
|
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
|
|
wal_globals.protected.available++;
|
|
spinlock_unlock(&wal_globals.protected.spinlock);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// command queue cache
|
|
|
|
static void rrdeng_cmd_queue_init(void) {
|
|
rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
|
|
sizeof(struct rrdeng_cmd),
|
|
0,
|
|
0,
|
|
NULL,
|
|
NULL, NULL, false, false);
|
|
|
|
telemetry_aral_register(rrdeng_main.cmd_queue.ar, "opcodes");
|
|
}
|
|
|
|
static inline STORAGE_PRIORITY rrdeng_enq_cmd_map_opcode_to_priority(enum rrdeng_opcode opcode, STORAGE_PRIORITY priority) {
|
|
if(unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE))
|
|
priority = STORAGE_PRIORITY_BEST_EFFORT;
|
|
|
|
switch(opcode) {
|
|
case RRDENG_OPCODE_QUERY:
|
|
priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return priority;
|
|
}
|
|
|
|
void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd) {
|
|
epdl_cmd_queued(cmd->data, cmd);
|
|
}
|
|
|
|
void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) {
|
|
epdl_cmd_dequeued(cmd->data);
|
|
}
|
|
|
|
void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) {
|
|
spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
|
|
struct rrdeng_cmd *cmd = get_cmd_cb(data);
|
|
if(cmd) {
|
|
priority = rrdeng_enq_cmd_map_opcode_to_priority(cmd->opcode, priority);
|
|
|
|
if (cmd->priority > priority) {
|
|
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[cmd->priority], cmd, queue.prev, queue.next);
|
|
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
|
|
cmd->priority = priority;
|
|
}
|
|
}
|
|
|
|
spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
}
|
|
|
|
void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion,
|
|
enum storage_priority priority, enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb) {
|
|
|
|
priority = rrdeng_enq_cmd_map_opcode_to_priority(opcode, priority);
|
|
|
|
struct rrdeng_cmd *cmd = aral_mallocz(rrdeng_main.cmd_queue.ar);
|
|
memset(cmd, 0, sizeof(struct rrdeng_cmd));
|
|
cmd->ctx = ctx;
|
|
cmd->opcode = opcode;
|
|
cmd->data = data;
|
|
cmd->completion = completion;
|
|
cmd->priority = priority;
|
|
cmd->dequeue_cb = dequeue_cb;
|
|
|
|
spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
|
|
rrdeng_main.cmd_queue.unsafe.waiting++;
|
|
if(enqueue_cb)
|
|
enqueue_cb(cmd);
|
|
spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
|
|
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
|
|
}
|
|
|
|
static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PRIORITY priority, STORAGE_PRIORITY max_priority) {
|
|
for(; priority <= max_priority ; priority++)
|
|
if(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority])
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
#define opcode_empty (struct rrdeng_cmd) { \
|
|
.ctx = NULL, \
|
|
.opcode = RRDENG_OPCODE_NOOP, \
|
|
.priority = STORAGE_PRIORITY_BEST_EFFORT, \
|
|
.completion = NULL, \
|
|
.data = NULL, \
|
|
}
|
|
|
|
static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
|
|
struct rrdeng_cmd *cmd = NULL;
|
|
enum LIBUV_WORKERS_STATUS status = work_request_full();
|
|
|
|
STORAGE_PRIORITY min_priority, max_priority;
|
|
min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE;
|
|
max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
|
|
|
|
if(from_worker) {
|
|
if(status == LIBUV_WORKERS_CRITICAL)
|
|
return opcode_empty;
|
|
|
|
min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
|
|
max_priority = STORAGE_PRIORITY_BEST_EFFORT;
|
|
}
|
|
|
|
// find an opcode to execute from the queue
|
|
spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
|
|
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
|
|
if(cmd) {
|
|
|
|
// avoid starvation of lower priorities
|
|
if(unlikely(priority >= STORAGE_PRIORITY_HIGH &&
|
|
priority < STORAGE_PRIORITY_BEST_EFFORT &&
|
|
++rrdeng_main.cmd_queue.unsafe.executed_by_priority[priority] % 50 == 0 &&
|
|
rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(priority + 1, max_priority))) {
|
|
// let the others run 2% of the requests
|
|
cmd = NULL;
|
|
continue;
|
|
}
|
|
|
|
// remove it from the queue
|
|
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
|
|
rrdeng_main.cmd_queue.unsafe.waiting--;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if(cmd && cmd->dequeue_cb) {
|
|
cmd->dequeue_cb(cmd);
|
|
cmd->dequeue_cb = NULL;
|
|
}
|
|
|
|
spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
|
|
|
|
struct rrdeng_cmd ret;
|
|
if(cmd) {
|
|
// copy it, to return it
|
|
ret = *cmd;
|
|
|
|
aral_freez(rrdeng_main.cmd_queue.ar, cmd);
|
|
}
|
|
else
|
|
ret = opcode_empty;
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void *dbengine_extent_alloc(size_t size) {
|
|
void *extent = mallocz(size);
|
|
return extent;
|
|
}
|
|
|
|
void dbengine_extent_free(void *extent, size_t size __maybe_unused) {
|
|
freez(extent);
|
|
}
|
|
|
|
static void journalfile_extent_build(struct rrdengine_instance *ctx, struct extent_io_descriptor *xt_io_descr) {
|
|
unsigned count, payload_length, descr_size, size_bytes;
|
|
void *buf;
|
|
/* persistent structures */
|
|
struct rrdeng_df_extent_header *df_header;
|
|
struct rrdeng_jf_transaction_header *jf_header;
|
|
struct rrdeng_jf_store_data *jf_metric_data;
|
|
struct rrdeng_jf_transaction_trailer *jf_trailer;
|
|
uLong crc;
|
|
|
|
df_header = xt_io_descr->buf;
|
|
count = df_header->number_of_pages;
|
|
descr_size = sizeof(*jf_metric_data->descr) * count;
|
|
payload_length = sizeof(*jf_metric_data) + descr_size;
|
|
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
|
|
|
|
xt_io_descr->wal = wal_get(ctx, size_bytes);
|
|
buf = xt_io_descr->wal->buf;
|
|
|
|
jf_header = buf;
|
|
jf_header->type = STORE_DATA;
|
|
jf_header->reserved = 0;
|
|
jf_header->id = xt_io_descr->wal->transaction_id;
|
|
jf_header->payload_length = payload_length;
|
|
|
|
jf_metric_data = buf + sizeof(*jf_header);
|
|
jf_metric_data->extent_offset = xt_io_descr->pos;
|
|
jf_metric_data->extent_size = xt_io_descr->bytes;
|
|
jf_metric_data->number_of_pages = count;
|
|
memcpy(jf_metric_data->descr, df_header->descr, descr_size);
|
|
|
|
jf_trailer = buf + sizeof(*jf_header) + payload_length;
|
|
crc = crc32(0L, Z_NULL, 0);
|
|
crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
|
|
crc32set(jf_trailer->checksum, crc);
|
|
}
|
|
|
|
static void after_extent_flushed_to_open(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
if(completion)
|
|
completion_mark_complete(completion);
|
|
|
|
if(ctx_is_available_for_queries(ctx))
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
}
|
|
|
|
static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_FLUSHED_TO_OPEN);
|
|
|
|
uv_fs_t *uv_fs_request = data;
|
|
struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
|
|
struct page_descr_with_data *descr;
|
|
struct rrdengine_datafile *datafile;
|
|
unsigned i;
|
|
|
|
datafile = xt_io_descr->datafile;
|
|
|
|
bool still_running = ctx_is_available_for_queries(ctx);
|
|
|
|
for (i = 0 ; i < xt_io_descr->descr_count ; ++i) {
|
|
descr = xt_io_descr->descr_array[i];
|
|
|
|
if (likely(still_running))
|
|
pgc_open_add_hot_page(
|
|
(Word_t)ctx, descr->metric_id,
|
|
(time_t) (descr->start_time_ut / USEC_PER_SEC),
|
|
(time_t) (descr->end_time_ut / USEC_PER_SEC),
|
|
descr->update_every_s,
|
|
datafile,
|
|
xt_io_descr->pos, xt_io_descr->bytes, descr->page_length);
|
|
|
|
page_descriptor_release(descr);
|
|
}
|
|
|
|
uv_fs_req_cleanup(uv_fs_request);
|
|
posix_memfree(xt_io_descr->buf);
|
|
extent_io_descriptor_release(xt_io_descr);
|
|
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
datafile->writers.flushed_to_open_running--;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
|
|
if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running)
|
|
// we just finished a flushing on a datafile that is not the active one
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
|
|
return data;
|
|
}
|
|
|
|
// Main event loop callback
|
|
static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
|
|
worker_is_busy(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE);
|
|
|
|
struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
|
|
struct rrdengine_datafile *datafile = xt_io_descr->datafile;
|
|
struct rrdengine_instance *ctx = datafile->ctx;
|
|
|
|
if (uv_fs_request->result < 0) {
|
|
ctx_io_error(ctx);
|
|
netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
|
|
}
|
|
|
|
journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop);
|
|
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
datafile->writers.running--;
|
|
datafile->writers.flushed_to_open_running++;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
|
|
rrdeng_enq_cmd(xt_io_descr->ctx,
|
|
RRDENG_OPCODE_FLUSHED_TO_OPEN,
|
|
uv_fs_request,
|
|
xt_io_descr->completion,
|
|
STORAGE_PRIORITY_INTERNAL_DBENGINE,
|
|
NULL,
|
|
NULL);
|
|
|
|
worker_is_idle();
|
|
}
|
|
|
|
static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
|
|
bool ret = false;
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
|
|
if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx))
|
|
ret = true;
|
|
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_instance *ctx) {
|
|
struct rrdengine_datafile *datafile;
|
|
|
|
// get the latest datafile
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
datafile = ctx->datafiles.first->prev;
|
|
// become a writer on this datafile, to prevent it from vanishing
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
datafile->writers.running++;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
if(datafile_is_full(ctx, datafile)) {
|
|
// remember the datafile we have become writers to
|
|
struct rrdengine_datafile *old_datafile = datafile;
|
|
|
|
// only 1 datafile creation at a time
|
|
static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
|
|
netdata_mutex_lock(&mutex);
|
|
|
|
// take the latest datafile again - without this, multiple threads may create multiple files
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
datafile = ctx->datafiles.first->prev;
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0)
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL,
|
|
NULL);
|
|
|
|
netdata_mutex_unlock(&mutex);
|
|
|
|
// get the new latest datafile again, like above
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
datafile = ctx->datafiles.first->prev;
|
|
// become a writer on this datafile, to prevent it from vanishing
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
datafile->writers.running++;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
// release the writers on the old datafile
|
|
spinlock_lock(&old_datafile->writers.spinlock);
|
|
old_datafile->writers.running--;
|
|
spinlock_unlock(&old_datafile->writers.spinlock);
|
|
}
|
|
|
|
return datafile;
|
|
}
|
|
|
|
/*
|
|
* Take a page list in a judy array and write them
|
|
*/
|
|
static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
|
|
int ret;
|
|
unsigned i, count, size_bytes, pos, real_io_size;
|
|
uint32_t uncompressed_payload_length, max_compressed_size, payload_offset;
|
|
struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
|
|
struct extent_io_descriptor *xt_io_descr;
|
|
Word_t Index;
|
|
uint8_t compression_algorithm = ctx->config.global_compress_alg;
|
|
struct rrdengine_datafile *datafile;
|
|
/* persistent structures */
|
|
struct rrdeng_df_extent_header *header;
|
|
struct rrdeng_df_extent_trailer *trailer;
|
|
uLong crc;
|
|
|
|
for(descr = base, Index = 0, count = 0, uncompressed_payload_length = 0;
|
|
descr && count != rrdeng_pages_per_extent;
|
|
descr = descr->link.next, Index++) {
|
|
|
|
uncompressed_payload_length += descr->page_length;
|
|
eligible_pages[count++] = descr;
|
|
|
|
}
|
|
|
|
if (!count) {
|
|
if (completion)
|
|
completion_mark_complete(completion);
|
|
|
|
__atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
|
|
return NULL;
|
|
}
|
|
|
|
xt_io_descr = extent_io_descriptor_get();
|
|
xt_io_descr->ctx = ctx;
|
|
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
|
|
max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm);
|
|
size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer);
|
|
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
|
|
if (unlikely(ret)) {
|
|
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
|
|
/* freez(xt_io_descr);*/
|
|
}
|
|
memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
|
|
(void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct page_descr_with_data *) * count);
|
|
xt_io_descr->descr_count = count;
|
|
|
|
pos = 0;
|
|
header = xt_io_descr->buf;
|
|
header->number_of_pages = count;
|
|
pos += sizeof(*header);
|
|
|
|
for (i = 0 ; i < count ; ++i) {
|
|
descr = xt_io_descr->descr_array[i];
|
|
header->descr[i].type = descr->type;
|
|
uuid_copy(*(nd_uuid_t *)header->descr[i].uuid, *descr->id);
|
|
header->descr[i].page_length = descr->page_length;
|
|
header->descr[i].start_time_ut = descr->start_time_ut;
|
|
|
|
switch (descr->type) {
|
|
case RRDENG_PAGE_TYPE_ARRAY_32BIT:
|
|
case RRDENG_PAGE_TYPE_ARRAY_TIER1:
|
|
header->descr[i].end_time_ut = descr->end_time_ut;
|
|
break;
|
|
case RRDENG_PAGE_TYPE_GORILLA_32BIT:
|
|
header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
|
|
header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
|
|
break;
|
|
default:
|
|
fatal("Unknown page type: %uc", descr->type);
|
|
}
|
|
|
|
pos += sizeof(header->descr[i]);
|
|
}
|
|
|
|
// build the extent payload
|
|
for (i = 0 ; i < count ; ++i) {
|
|
descr = xt_io_descr->descr_array[i];
|
|
pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
|
|
pos += descr->page_length;
|
|
}
|
|
|
|
// compress the payload
|
|
size_t compressed_size =
|
|
(int)dbengine_compress(xt_io_descr->buf + payload_offset,
|
|
uncompressed_payload_length,
|
|
compression_algorithm);
|
|
|
|
internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed");
|
|
internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent");
|
|
|
|
if(compressed_size) {
|
|
header->compression_algorithm = compression_algorithm;
|
|
header->payload_length = compressed_size;
|
|
}
|
|
else {
|
|
// compression failed, or generated bigger pages
|
|
// so it didn't touch our uncompressed buffer
|
|
header->compression_algorithm = RRDENG_COMPRESSION_NONE;
|
|
header->payload_length = compressed_size = uncompressed_payload_length;
|
|
}
|
|
|
|
// set the correct size
|
|
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
|
|
|
|
if(compression_algorithm != RRDENG_COMPRESSION_NONE) {
|
|
__atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
|
|
__atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
|
|
|
|
datafile = get_datafile_to_write_extent(ctx);
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
xt_io_descr->datafile = datafile;
|
|
xt_io_descr->pos = datafile->pos;
|
|
datafile->pos += real_io_size;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
|
|
xt_io_descr->bytes = size_bytes;
|
|
xt_io_descr->uv_fs_request.data = xt_io_descr;
|
|
xt_io_descr->completion = completion;
|
|
|
|
trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
|
|
crc = crc32(0L, Z_NULL, 0);
|
|
crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
|
|
crc32set(trailer->checksum, crc);
|
|
|
|
xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
|
|
journalfile_extent_build(ctx, xt_io_descr);
|
|
|
|
ctx_last_flush_fileno_set(ctx, datafile->fileno);
|
|
ctx_current_disk_space_increase(ctx, real_io_size);
|
|
ctx_io_write_op_bytes(ctx, real_io_size);
|
|
|
|
return xt_io_descr;
|
|
}
|
|
|
|
static void after_extent_write(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* uv_work_req __maybe_unused, int status __maybe_unused) {
|
|
struct extent_io_descriptor *xt_io_descr = data;
|
|
|
|
if(xt_io_descr) {
|
|
int ret = uv_fs_write(&rrdeng_main.loop,
|
|
&xt_io_descr->uv_fs_request,
|
|
xt_io_descr->datafile->file,
|
|
&xt_io_descr->iov,
|
|
1,
|
|
(int64_t) xt_io_descr->pos,
|
|
after_extent_write_datafile_io);
|
|
|
|
fatal_assert(-1 != ret);
|
|
}
|
|
}
|
|
|
|
static void *extent_write_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_WRITE);
|
|
struct page_descr_with_data *base = data;
|
|
struct extent_io_descriptor *xt_io_descr = datafile_extent_build(ctx, base, completion);
|
|
return xt_io_descr;
|
|
}
|
|
|
|
static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
__atomic_store_n(&ctx->atomic.now_deleting_files, false, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
struct uuid_first_time_s {
|
|
nd_uuid_t *uuid;
|
|
time_t first_time_s;
|
|
METRIC *metric;
|
|
size_t pages_found;
|
|
size_t df_matched;
|
|
size_t df_index_oldest;
|
|
};
|
|
|
|
struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
|
|
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
|
|
struct rrdengine_datafile *next_datafile = datafile->next;
|
|
|
|
while(next_datafile && !datafile_acquire(next_datafile, DATAFILE_ACQUIRE_RETENTION))
|
|
next_datafile = next_datafile->next;
|
|
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
datafile_release(datafile, DATAFILE_ACQUIRE_RETENTION);
|
|
|
|
return next_datafile;
|
|
}
|
|
|
|
time_t find_uuid_first_time(
|
|
struct rrdengine_instance *ctx,
|
|
struct rrdengine_datafile *datafile,
|
|
struct uuid_first_time_s *uuid_first_entry_list,
|
|
size_t count)
|
|
{
|
|
time_t global_first_time_s = LONG_MAX;
|
|
|
|
// acquire the datafile to work with it
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
|
|
datafile = datafile->next;
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
if (unlikely(!datafile))
|
|
return global_first_time_s;
|
|
|
|
unsigned journalfile_count = 0;
|
|
size_t binary_match = 0;
|
|
size_t not_matching_bsearches = 0;
|
|
|
|
while (datafile) {
|
|
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0);
|
|
if (!j2_header) {
|
|
datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
|
|
continue;
|
|
}
|
|
|
|
time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
|
|
|
|
if(journal_start_time_s < global_first_time_s)
|
|
global_first_time_s = journal_start_time_s;
|
|
|
|
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
|
|
struct uuid_first_time_s *uuid_original_entry;
|
|
|
|
size_t journal_metric_count = j2_header->metric_count;
|
|
|
|
for (size_t index = 0; index < count; ++index) {
|
|
uuid_original_entry = &uuid_first_entry_list[index];
|
|
|
|
// Check here if we should skip this
|
|
if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
|
|
continue;
|
|
|
|
struct journal_metric_list *live_entry =
|
|
bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,
|
|
sizeof(*uuid_list), journal_metric_uuid_compare);
|
|
|
|
if (!live_entry) {
|
|
// Not found in this journal
|
|
not_matching_bsearches++;
|
|
continue;
|
|
}
|
|
|
|
uuid_original_entry->pages_found += live_entry->entries;
|
|
uuid_original_entry->df_matched++;
|
|
|
|
time_t old_first_time_s = uuid_original_entry->first_time_s;
|
|
|
|
// Calculate first / last for this match
|
|
time_t first_time_s = live_entry->delta_start_s + journal_start_time_s;
|
|
uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s);
|
|
|
|
if (uuid_original_entry->first_time_s != old_first_time_s)
|
|
uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched;
|
|
|
|
binary_match++;
|
|
}
|
|
|
|
journalfile_count++;
|
|
journalfile_v2_data_release(datafile->journalfile);
|
|
datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
|
|
}
|
|
|
|
// Let's scan the open cache for almost exact match
|
|
size_t open_cache_count = 0;
|
|
|
|
size_t df_index[10] = { 0 };
|
|
size_t without_metric = 0;
|
|
size_t open_cache_gave_first_time_s = 0;
|
|
size_t metric_count = 0;
|
|
size_t without_retention = 0;
|
|
size_t not_needed_bsearches = 0;
|
|
|
|
for (size_t index = 0; index < count; ++index) {
|
|
struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index];
|
|
|
|
metric_count++;
|
|
|
|
size_t idx = uuid_first_t_entry->df_index_oldest;
|
|
if(idx >= 10)
|
|
idx = 9;
|
|
|
|
df_index[idx]++;
|
|
|
|
not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest;
|
|
|
|
if (unlikely(!uuid_first_t_entry->metric)) {
|
|
without_metric++;
|
|
continue;
|
|
}
|
|
|
|
PGC_PAGE *page = pgc_page_get_and_acquire(
|
|
open_cache, (Word_t)ctx,
|
|
(Word_t)uuid_first_t_entry->metric, 0,
|
|
PGC_SEARCH_FIRST);
|
|
|
|
if (page) {
|
|
time_t old_first_time_s = uuid_first_t_entry->first_time_s;
|
|
|
|
time_t first_time_s = pgc_page_start_time_s(page);
|
|
uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s);
|
|
pgc_page_release(open_cache, page);
|
|
open_cache_count++;
|
|
|
|
if(uuid_first_t_entry->first_time_s != old_first_time_s) {
|
|
open_cache_gave_first_time_s++;
|
|
}
|
|
}
|
|
else {
|
|
if(!uuid_first_t_entry->df_index_oldest)
|
|
without_retention++;
|
|
}
|
|
}
|
|
internal_error(true,
|
|
"DBENGINE: analyzed the retention of %zu rotated metrics of tier %d, "
|
|
"did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, "
|
|
"%zu metrics with entries in open cache, "
|
|
"metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), "
|
|
"open cache found first time %zu, "
|
|
"metrics without any remaining retention %zu, "
|
|
"metrics not in MRG %zu",
|
|
metric_count,
|
|
ctx->config.tier,
|
|
binary_match,
|
|
not_matching_bsearches,
|
|
not_needed_bsearches,
|
|
journalfile_count,
|
|
open_cache_count,
|
|
df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9],
|
|
open_cache_gave_first_time_s,
|
|
without_retention,
|
|
without_metric
|
|
);
|
|
|
|
return global_first_time_s;
|
|
}
|
|
|
|
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
|
|
time_t global_first_time_s = LONG_MAX;
|
|
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
|
|
|
|
struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile;
|
|
struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
|
|
|
|
if (unlikely(!j2_header)) {
|
|
if (worker)
|
|
worker_is_idle();
|
|
return;
|
|
}
|
|
|
|
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
|
|
|
|
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
|
|
|
|
size_t count = j2_header->metric_count;
|
|
struct uuid_first_time_s *uuid_first_t_entry;
|
|
struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s));
|
|
|
|
size_t added = 0;
|
|
for (size_t index = 0; index < count; ++index) {
|
|
METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx);
|
|
if (!metric)
|
|
continue;
|
|
|
|
uuid_first_entry_list[added].metric = metric;
|
|
uuid_first_entry_list[added].first_time_s = LONG_MAX;
|
|
uuid_first_entry_list[added].df_matched = 0;
|
|
uuid_first_entry_list[added].df_index_oldest = 0;
|
|
uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric);
|
|
added++;
|
|
}
|
|
|
|
netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
|
|
ctx->config.tier, count, first_datafile_remaining->fileno);
|
|
|
|
journalfile_v2_data_release(journalfile);
|
|
|
|
// Update the first time / last time for all metrics we plan to delete
|
|
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
|
|
|
|
global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
|
|
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
|
|
|
|
netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
|
|
ctx->config.tier, added);
|
|
|
|
size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0;
|
|
for (size_t index = 0; index < added; ++index) {
|
|
uuid_first_t_entry = &uuid_first_entry_list[index];
|
|
if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
|
|
|
|
time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
|
|
|
|
bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
|
|
if (changed) {
|
|
uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
|
|
if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) {
|
|
uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s;
|
|
__atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
|
|
}
|
|
}
|
|
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
|
|
}
|
|
else {
|
|
zero_disk_retention++;
|
|
|
|
// there is no retention for this metric
|
|
bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
|
|
if (!has_retention) {
|
|
time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
|
|
time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric);
|
|
time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
|
|
if (update_every_s && first_time_s && last_time_s) {
|
|
uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s;
|
|
__atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
|
|
}
|
|
|
|
bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
|
|
if(deleted)
|
|
deleted_metrics++;
|
|
else
|
|
zero_retention_referenced++;
|
|
}
|
|
else {
|
|
zero_disk_but_live++;
|
|
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
|
|
}
|
|
}
|
|
}
|
|
freez(uuid_first_entry_list);
|
|
|
|
internal_error(zero_disk_retention,
|
|
"DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry",
|
|
deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier);
|
|
|
|
if(global_first_time_s != LONG_MAX)
|
|
__atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED);
|
|
|
|
if(worker)
|
|
worker_is_idle();
|
|
}
|
|
|
|
void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker) {
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
|
|
|
|
bool datafile_got_for_deletion = datafile_acquire_for_deletion(datafile, false);
|
|
|
|
if (update_retention)
|
|
update_metrics_first_time_s(ctx, datafile, datafile->next, worker);
|
|
|
|
while (!datafile_got_for_deletion) {
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
|
|
|
|
datafile_got_for_deletion = datafile_acquire_for_deletion(datafile, false);
|
|
|
|
if (!datafile_got_for_deletion) {
|
|
netdata_log_info("DBENGINE: waiting for data file '%s/"
|
|
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
|
|
"' to be available for deletion, "
|
|
"it is in use currently by %u users.",
|
|
ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno, datafile->users.lockers);
|
|
|
|
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_spin, 1, __ATOMIC_RELAXED);
|
|
sleep_usec(1 * USEC_PER_SEC);
|
|
}
|
|
}
|
|
|
|
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED);
|
|
netdata_log_info("DBENGINE: deleting data file '%s/"
|
|
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
|
|
"'.",
|
|
ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
|
|
|
|
if(worker)
|
|
worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE);
|
|
|
|
struct rrdengine_journalfile *journal_file;
|
|
unsigned deleted_bytes, journal_file_bytes, datafile_bytes;
|
|
int ret;
|
|
char path[RRDENG_PATH_MAX];
|
|
|
|
uv_rwlock_wrlock(&ctx->datafiles.rwlock);
|
|
datafile_list_delete_unsafe(ctx, datafile);
|
|
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
|
|
|
|
journal_file = datafile->journalfile;
|
|
datafile_bytes = datafile->pos;
|
|
journal_file_bytes = journalfile_current_size(journal_file);
|
|
deleted_bytes = journalfile_v2_data_size_get(journal_file);
|
|
|
|
netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota");
|
|
ret = journalfile_destroy_unsafe(journal_file, datafile);
|
|
if (!ret) {
|
|
journalfile_v1_generate_path(datafile, path, sizeof(path));
|
|
netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
|
|
journalfile_v2_generate_path(datafile, path, sizeof(path));
|
|
netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
|
|
deleted_bytes += journal_file_bytes;
|
|
}
|
|
ret = destroy_data_file_unsafe(datafile);
|
|
if (!ret) {
|
|
generate_datafilepath(datafile, path, sizeof(path));
|
|
netdata_log_info("DBENGINE: deleted data file \"%s\".", path);
|
|
deleted_bytes += datafile_bytes;
|
|
}
|
|
freez(journal_file);
|
|
freez(datafile);
|
|
|
|
ctx_current_disk_space_decrease(ctx, deleted_bytes);
|
|
netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
|
|
}
|
|
|
|
static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true);
|
|
|
|
if (rrdeng_ctx_tier_cap_exceeded(ctx))
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
|
|
rrdcontext_db_rotation();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
;
|
|
}
|
|
|
|
static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_QUIESCE);
|
|
pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
|
|
completion_mark_complete(&ctx->quiesce.completion);
|
|
return data;
|
|
}
|
|
|
|
static void after_populate_mrg(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
;
|
|
}
|
|
|
|
static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
|
|
|
|
do {
|
|
struct rrdengine_datafile *datafile = NULL;
|
|
|
|
// find a datafile to work
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
|
|
if(!spinlock_trylock(&datafile->populate_mrg.spinlock))
|
|
continue;
|
|
|
|
if(datafile->populate_mrg.populated) {
|
|
spinlock_unlock(&datafile->populate_mrg.spinlock);
|
|
continue;
|
|
}
|
|
|
|
// we have the spinlock and it is not populated
|
|
break;
|
|
}
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
|
|
if(!datafile)
|
|
break;
|
|
|
|
journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
|
|
datafile->populate_mrg.populated = true;
|
|
spinlock_unlock(&datafile->populate_mrg.spinlock);
|
|
|
|
} while(1);
|
|
|
|
completion_mark_complete(completion);
|
|
|
|
return data;
|
|
}
|
|
|
|
static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
;
|
|
}
|
|
|
|
static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN);
|
|
|
|
bool logged = false;
|
|
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) ||
|
|
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
|
|
if(!logged) {
|
|
logged = true;
|
|
netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
|
|
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), ctx->config.tier);
|
|
}
|
|
sleep_usec(1 * USEC_PER_MS);
|
|
}
|
|
|
|
completion_mark_complete(completion);
|
|
|
|
return data;
|
|
}
|
|
|
|
static void *cache_flush_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
if (!main_cache)
|
|
return data;
|
|
|
|
worker_is_busy(UV_EVENT_DBENGINE_FLUSH_MAIN_CACHE);
|
|
while (pgc_flush_pages(main_cache))
|
|
yield_the_processor();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void *cache_evict_main_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
|
|
if (!main_cache)
|
|
return data;
|
|
|
|
worker_is_busy(UV_EVENT_DBENGINE_EVICT_MAIN_CACHE);
|
|
while (pgc_evict_pages(main_cache, 0, 0))
|
|
yield_the_processor();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void *cache_evict_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
|
|
if (!open_cache)
|
|
return data;
|
|
|
|
worker_is_busy(UV_EVENT_DBENGINE_EVICT_OPEN_CACHE);
|
|
while (pgc_evict_pages(open_cache, 0, 0))
|
|
yield_the_processor();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void *cache_evict_extent_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
|
|
if (!extent_cache)
|
|
return data;
|
|
|
|
worker_is_busy(UV_EVENT_DBENGINE_EVICT_EXTENT_CACHE);
|
|
while (pgc_evict_pages(extent_cache, 0, 0))
|
|
yield_the_processor();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
|
|
PDC *pdc = data;
|
|
rrdeng_prep_query(pdc, true);
|
|
return data;
|
|
}
|
|
|
|
uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
|
|
uint64_t target_size = ctx->config.max_disk_space ? ctx->config.max_disk_space / TARGET_DATAFILES : MAX_DATAFILE_SIZE;
|
|
target_size = MIN(target_size, MAX_DATAFILE_SIZE);
|
|
target_size = MAX(target_size, MIN_DATAFILE_SIZE);
|
|
return target_size;
|
|
}
|
|
|
|
time_t get_datafile_end_time(struct rrdengine_instance *ctx)
|
|
{
|
|
time_t last_time_s = 0;
|
|
|
|
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
|
|
struct rrdengine_datafile *datafile = ctx->datafiles.first;
|
|
|
|
if (datafile) {
|
|
last_time_s = datafile->journalfile->v2.last_time_s;
|
|
if (!last_time_s)
|
|
last_time_s = datafile->journalfile->v2.first_time_s;
|
|
}
|
|
|
|
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
|
|
return last_time_s;
|
|
}
|
|
|
|
/* return 0 on success */
|
|
int init_rrd_files(struct rrdengine_instance *ctx)
|
|
{
|
|
return init_data_files(ctx);
|
|
}
|
|
|
|
void finalize_rrd_files(struct rrdengine_instance *ctx)
|
|
{
|
|
return finalize_data_files(ctx);
|
|
}
|
|
|
|
void async_cb(uv_async_t *handle)
|
|
{
|
|
uv_stop(handle->loop);
|
|
uv_update_time(handle->loop);
|
|
netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
|
|
}
|
|
|
|
#define TIMER_PERIOD_MS (1000)
|
|
|
|
|
|
static void *extent_read_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
EPDL *epdl = data;
|
|
epdl_find_extent_and_populate_pages(ctx, epdl, true);
|
|
return data;
|
|
}
|
|
|
|
static void epdl_populate_pages_asynchronously(struct rrdengine_instance *ctx, EPDL *epdl, STORAGE_PRIORITY priority) {
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_READ, epdl, NULL, priority,
|
|
rrdeng_enqueue_epdl_cmd, rrdeng_dequeue_epdl_cmd);
|
|
}
|
|
|
|
void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
|
|
pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_asynchronously, epdl_populate_pages_asynchronously);
|
|
}
|
|
|
|
void epdl_populate_pages_synchronously(struct rrdengine_instance *ctx, EPDL *epdl, enum storage_priority priority __maybe_unused) {
|
|
epdl_find_extent_and_populate_pages(ctx, epdl, false);
|
|
}
|
|
|
|
void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
|
|
pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously);
|
|
}
|
|
|
|
#define MAX_RETRIES_TO_START_INDEX (100)
|
|
static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
unsigned count = 0;
|
|
worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX_WAIT);
|
|
|
|
while (__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && count++ < MAX_RETRIES_TO_START_INDEX)
|
|
sleep_usec(100 * USEC_PER_MS);
|
|
|
|
if (count == MAX_RETRIES_TO_START_INDEX) {
|
|
worker_is_idle();
|
|
return data;
|
|
}
|
|
|
|
struct rrdengine_datafile *datafile = ctx->datafiles.first;
|
|
worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX);
|
|
count = 0;
|
|
while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) {
|
|
if(journalfile_v2_data_available(datafile->journalfile)) {
|
|
// journal file v2 is already there for this datafile
|
|
datafile = datafile->next;
|
|
continue;
|
|
}
|
|
|
|
spinlock_lock(&datafile->writers.spinlock);
|
|
bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
|
|
spinlock_unlock(&datafile->writers.spinlock);
|
|
|
|
if(!available) {
|
|
nd_log(NDLS_DAEMON, NDLP_NOTICE,
|
|
"DBENGINE: journal file %u needs to be indexed, but it has writers working on it - "
|
|
"skipping it for now",
|
|
datafile->fileno);
|
|
|
|
datafile = datafile->next;
|
|
continue;
|
|
}
|
|
|
|
nd_log(NDLS_DAEMON, NDLP_DEBUG,
|
|
"DBENGINE: journal file %u is ready to be indexed",
|
|
datafile->fileno);
|
|
|
|
pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
|
|
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
|
|
|
|
count++;
|
|
|
|
datafile = datafile->next;
|
|
|
|
if (unlikely(!ctx_is_available_for_queries(ctx)))
|
|
break;
|
|
}
|
|
|
|
errno_clear();
|
|
if(count)
|
|
nd_log(NDLS_DAEMON, NDLP_DEBUG,
|
|
"DBENGINE: journal indexing done; %u files processed",
|
|
count);
|
|
|
|
worker_is_idle();
|
|
|
|
return data;
|
|
}
|
|
|
|
static void after_do_cache_flush(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
rrdeng_main.flushes_running--;
|
|
}
|
|
|
|
static void after_do_main_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
rrdeng_main.evict_main_running--;
|
|
}
|
|
|
|
static void after_do_open_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
rrdeng_main.evict_open_running--;
|
|
}
|
|
|
|
static void after_do_extent_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
rrdeng_main.evict_extent_running--;
|
|
}
|
|
|
|
static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
__atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
|
|
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
}
|
|
|
|
struct rrdeng_buffer_sizes rrdeng_get_buffer_sizes(void) {
|
|
return (struct rrdeng_buffer_sizes) {
|
|
.pgc = pgc_aral_overhead() + pgc_aral_structures(),
|
|
.pgd = pgd_aral_overhead() + pgd_aral_structures(),
|
|
.mrg = mrg_aral_overhead() + mrg_aral_structures(),
|
|
.opcodes = aral_overhead(rrdeng_main.cmd_queue.ar) + aral_structures(rrdeng_main.cmd_queue.ar),
|
|
.handles = aral_overhead(rrdeng_main.handles.ar) + aral_structures(rrdeng_main.handles.ar),
|
|
.descriptors = aral_overhead(rrdeng_main.descriptors.ar) + aral_structures(rrdeng_main.descriptors.ar),
|
|
.wal = __atomic_load_n(&wal_globals.atomics.allocated, __ATOMIC_RELAXED) * (sizeof(WAL) + RRDENG_BLOCK_SIZE),
|
|
.workers = aral_overhead(rrdeng_main.work_cmd.ar),
|
|
.pdc = pdc_cache_size(),
|
|
.xt_io = aral_overhead(rrdeng_main.xt_io_descr.ar) + aral_structures(rrdeng_main.xt_io_descr.ar),
|
|
.xt_buf = extent_buffer_cache_size(),
|
|
.epdl = epdl_cache_size(),
|
|
.deol = deol_cache_size(),
|
|
.pd = pd_cache_size(),
|
|
|
|
#ifdef PDC_USE_JULYL
|
|
.julyl = julyl_cache_size(),
|
|
#endif
|
|
};
|
|
}
|
|
|
|
static void after_cleanup(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
|
|
rrdeng_main.cleanup_running--;
|
|
}
|
|
|
|
static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
|
|
worker_is_busy(UV_EVENT_DBENGINE_BUFFERS_CLEANUP);
|
|
|
|
wal_cleanup1();
|
|
extent_buffer_cleanup1();
|
|
|
|
{
|
|
static time_t last_run_s = 0;
|
|
time_t now_s = now_monotonic_sec();
|
|
if(now_s - last_run_s >= 10) {
|
|
last_run_s = now_s;
|
|
journalfile_v2_data_unmount_cleanup(now_s);
|
|
}
|
|
}
|
|
|
|
#ifdef PDC_USE_JULYL
|
|
julyl_cleanup1();
|
|
#endif
|
|
|
|
return data;
|
|
}
|
|
|
|
uint64_t get_used_disk_space(struct rrdengine_instance *ctx)
|
|
{
|
|
uint64_t active_space = 0;
|
|
|
|
if (ctx->datafiles.first && ctx->datafiles.first->prev)
|
|
active_space = ctx->datafiles.first->prev->pos;
|
|
|
|
uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - active_space;
|
|
|
|
uint64_t database_space = get_total_database_space();
|
|
uint64_t adjusted_database_space = database_space * ctx->config.disk_percentage / 100 ;
|
|
estimated_disk_space += adjusted_database_space;
|
|
|
|
return estimated_disk_space;
|
|
}
|
|
|
|
static time_t get_tier_retention(struct rrdengine_instance *ctx)
|
|
{
|
|
time_t retention = 0;
|
|
if (localhost) {
|
|
STORAGE_ENGINE *eng = localhost->db[ctx->config.tier].eng;
|
|
if (eng) {
|
|
time_t first_time_s = get_datafile_end_time(ctx);
|
|
if (first_time_s)
|
|
retention = now_realtime_sec() - first_time_s;
|
|
}
|
|
}
|
|
return retention;
|
|
}
|
|
|
|
// Check if disk or retention time cap reached
|
|
bool rrdeng_ctx_tier_cap_exceeded(struct rrdengine_instance *ctx)
|
|
{
|
|
if(!ctx->datafiles.first)
|
|
// no datafiles available
|
|
return false;
|
|
|
|
if(!ctx->datafiles.first->next)
|
|
// only 1 datafile available
|
|
return false;
|
|
|
|
uint64_t estimated_disk_space = get_used_disk_space(ctx);
|
|
time_t retention = get_tier_retention(ctx);
|
|
|
|
if (ctx->config.max_retention_s && retention > ctx->config.max_retention_s)
|
|
return true;
|
|
|
|
if (ctx->config.max_disk_space && estimated_disk_space > ctx->config.max_disk_space)
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
static void retention_timer_cb(uv_timer_t *handle) {
|
|
if (!localhost)
|
|
return;
|
|
|
|
worker_is_busy(RRDENG_TIMER_CB);
|
|
uv_stop(handle->loop);
|
|
uv_update_time(handle->loop);
|
|
|
|
for (size_t tier = 0; tier < storage_tiers; tier++) {
|
|
STORAGE_ENGINE *eng = localhost->db[tier].eng;
|
|
if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE)
|
|
continue;
|
|
bool cleanup = rrdeng_ctx_tier_cap_exceeded(multidb_ctx[tier]);
|
|
if (cleanup)
|
|
rrdeng_enq_cmd(multidb_ctx[tier], RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
}
|
|
|
|
worker_is_idle();
|
|
}
|
|
|
|
static void timer_per_sec_cb(uv_timer_t* handle) {
|
|
worker_is_busy(RRDENG_TIMER_CB);
|
|
uv_stop(handle->loop);
|
|
uv_update_time(handle->loop);
|
|
|
|
worker_set_metric(RRDENG_OPCODES_WAITING, (NETDATA_DOUBLE)rrdeng_main.cmd_queue.unsafe.waiting);
|
|
worker_set_metric(RRDENG_WORKS_DISPATCHED, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED));
|
|
worker_set_metric(RRDENG_WORKS_EXECUTING, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.executing, __ATOMIC_RELAXED));
|
|
|
|
// rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_MAIN, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
// rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_OPEN, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
// rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_EXTENT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_MAIN, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
rrdeng_enq_cmd(NULL, RRDENG_OPCODE_CLEANUP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
|
|
|
|
worker_is_idle();
|
|
}
|
|
|
|
static void dbengine_initialize_structures(void) {
|
|
pgd_init_arals();
|
|
pgc_and_mrg_initialize();
|
|
|
|
pdc_init();
|
|
page_details_init();
|
|
epdl_init();
|
|
deol_init();
|
|
rrdeng_cmd_queue_init();
|
|
work_request_init();
|
|
rrdeng_query_handle_init();
|
|
page_descriptors_init();
|
|
extent_buffer_init();
|
|
extent_io_descriptor_init();
|
|
}
|
|
|
|
bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
|
|
static bool spawned = false;
|
|
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
|
|
|
|
spinlock_lock(&spinlock);
|
|
|
|
if(!spawned) {
|
|
int ret;
|
|
|
|
ret = uv_loop_init(&rrdeng_main.loop);
|
|
if (ret) {
|
|
netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
|
|
return false;
|
|
}
|
|
rrdeng_main.loop.data = &rrdeng_main;
|
|
|
|
ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb);
|
|
if (ret) {
|
|
netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
|
|
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
|
|
return false;
|
|
}
|
|
rrdeng_main.async.data = &rrdeng_main;
|
|
|
|
ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer);
|
|
if (ret) {
|
|
netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
|
|
uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
|
|
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
|
|
return false;
|
|
}
|
|
|
|
ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.retention_timer);
|
|
if (ret) {
|
|
netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
|
|
uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
|
|
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
|
|
return false;
|
|
}
|
|
|
|
rrdeng_main.timer.data = &rrdeng_main;
|
|
rrdeng_main.retention_timer.data = &rrdeng_main;
|
|
|
|
dbengine_initialize_structures();
|
|
|
|
fatal_assert(0 == uv_thread_create(&rrdeng_main.thread, dbengine_event_loop, &rrdeng_main));
|
|
spawned = true;
|
|
}
|
|
|
|
spinlock_unlock(&spinlock);
|
|
return true;
|
|
}
|
|
|
|
static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
EPDL *epdl = cmd.data;
|
|
|
|
if(from_worker)
|
|
epdl_find_extent_and_populate_pages(ctx, epdl, true);
|
|
else
|
|
work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL);
|
|
}
|
|
|
|
static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
PDC *pdc = cmd.data;
|
|
|
|
if(from_worker)
|
|
rrdeng_prep_query(pdc, true);
|
|
else
|
|
work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL);
|
|
}
|
|
|
|
uint64_t get_directory_free_bytes_space(struct rrdengine_instance *ctx)
|
|
{
|
|
uint64_t free_bytes = 0;
|
|
struct statvfs buff_statvfs;
|
|
if (statvfs(ctx->config.dbfiles_path, &buff_statvfs) == 0)
|
|
free_bytes = buff_statvfs.f_bavail * buff_statvfs.f_bsize;
|
|
|
|
return (free_bytes - (free_bytes * 5 / 100));
|
|
}
|
|
|
|
void calculate_tier_disk_space_percentage(void)
|
|
{
|
|
uint64_t tier_space[RRD_STORAGE_TIERS];
|
|
|
|
if (!localhost)
|
|
return;
|
|
|
|
uint64_t total_diskspace = 0;
|
|
for(size_t tier = 0; tier < storage_tiers ;tier++) {
|
|
STORAGE_ENGINE *eng = localhost->db[tier].eng;
|
|
if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) {
|
|
tier_space[tier] = 0;
|
|
continue;
|
|
}
|
|
uint64_t tier_disk_space = multidb_ctx[tier]->config.max_disk_space ?
|
|
multidb_ctx[tier]->config.max_disk_space :
|
|
get_directory_free_bytes_space(multidb_ctx[tier]);
|
|
total_diskspace += tier_disk_space;
|
|
tier_space[tier] = tier_disk_space;
|
|
}
|
|
|
|
if (total_diskspace) {
|
|
for (size_t tier = 0; tier < storage_tiers; tier++) {
|
|
multidb_ctx[tier]->config.disk_percentage = (100 * tier_space[tier] / total_diskspace);
|
|
}
|
|
}
|
|
}
|
|
|
|
void dbengine_retention_statistics(void)
|
|
{
|
|
static bool init = false;
|
|
static DBENGINE_TIER_STATS stats[RRD_STORAGE_TIERS];
|
|
|
|
if (!localhost)
|
|
return;
|
|
|
|
calculate_tier_disk_space_percentage();
|
|
|
|
for (size_t tier = 0; tier < storage_tiers; tier++) {
|
|
STORAGE_ENGINE *eng = localhost->db[tier].eng;
|
|
if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE)
|
|
continue;
|
|
|
|
if (init == false) {
|
|
char id[200];
|
|
snprintfz(id, sizeof(id) - 1, "dbengine_retention_tier%zu", tier);
|
|
stats[tier].st = rrdset_create_localhost(
|
|
"netdata",
|
|
id,
|
|
NULL,
|
|
"dbengine retention",
|
|
"netdata.dbengine_tier_retention",
|
|
"dbengine space and time retention",
|
|
"%",
|
|
"netdata",
|
|
"stats",
|
|
134900, // before "dbengine memory" (dbengine2_statistics_charts)
|
|
10,
|
|
RRDSET_TYPE_LINE);
|
|
|
|
stats[tier].rd_space = rrddim_add(stats[tier].st, "space", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
|
stats[tier].rd_time = rrddim_add(stats[tier].st, "time", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
|
|
|
char tier_str[5];
|
|
snprintfz(tier_str, 4, "%zu", tier);
|
|
rrdlabels_add(stats[tier].st->rrdlabels, "tier", tier_str, RRDLABEL_SRC_AUTO);
|
|
|
|
rrdset_flag_set(stats[tier].st, RRDSET_FLAG_METADATA_UPDATE);
|
|
rrdhost_flag_set(stats[tier].st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
|
|
rrdset_metadata_updated(stats[tier].st);
|
|
}
|
|
|
|
time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si);
|
|
time_t retention = first_time_s ? now_realtime_sec() - first_time_s : 0;
|
|
|
|
//
|
|
// Note: storage_engine_disk_space_used is the exact diskspace (as reported by api/v2/node_instances
|
|
// get_used_disk_space is used to determine if database cleanup (file rotation should happen)
|
|
// and adds to the disk space used the desired file size of the active
|
|
// datafile
|
|
uint64_t disk_space = get_used_disk_space(multidb_ctx[tier]);
|
|
//uint64_t disk_space = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si);
|
|
|
|
uint64_t config_disk_space = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si);
|
|
if (!config_disk_space) {
|
|
config_disk_space = get_directory_free_bytes_space(multidb_ctx[tier]);
|
|
config_disk_space += disk_space;
|
|
}
|
|
|
|
collected_number disk_percentage = (collected_number) (config_disk_space ? 100 * disk_space / config_disk_space : 0);
|
|
|
|
collected_number retention_percentage = (collected_number)multidb_ctx[tier]->config.max_retention_s ?
|
|
100 * retention / multidb_ctx[tier]->config.max_retention_s :
|
|
0;
|
|
|
|
if (retention_percentage > 100)
|
|
retention_percentage = 100;
|
|
|
|
rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_space, (collected_number) disk_percentage);
|
|
rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_time, (collected_number) retention_percentage);
|
|
|
|
rrdset_done(stats[tier].st);
|
|
}
|
|
init = true;
|
|
}
|
|
|
|
void dbengine_event_loop(void* arg) {
|
|
sanity_check();
|
|
uv_thread_set_name_np("DBENGINE");
|
|
service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
|
|
|
|
worker_register("DBENGINE");
|
|
|
|
// opcode jobs
|
|
worker_register_job_name(RRDENG_OPCODE_NOOP, "noop");
|
|
|
|
worker_register_job_name(RRDENG_OPCODE_QUERY, "query");
|
|
worker_register_job_name(RRDENG_OPCODE_EXTENT_WRITE, "extent write");
|
|
worker_register_job_name(RRDENG_OPCODE_EXTENT_READ, "extent read");
|
|
worker_register_job_name(RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open");
|
|
worker_register_job_name(RRDENG_OPCODE_DATABASE_ROTATE, "db rotate");
|
|
worker_register_job_name(RRDENG_OPCODE_JOURNAL_INDEX, "journal index");
|
|
worker_register_job_name(RRDENG_OPCODE_FLUSH_MAIN, "flush init");
|
|
worker_register_job_name(RRDENG_OPCODE_EVICT_MAIN, "evict init");
|
|
worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown");
|
|
worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce");
|
|
worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown");
|
|
|
|
worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode");
|
|
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_QUERY, "query cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE, "extent write cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_READ, "extent read cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_DATABASE_ROTATE, "db rotate cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_JOURNAL_INDEX, "journal index cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSH_MAIN, "flush init cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EVICT_MAIN, "evict init cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown cb");
|
|
worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce cb");
|
|
|
|
// special jobs
|
|
worker_register_job_name(RRDENG_TIMER_CB, "timer");
|
|
worker_register_job_name(RRDENG_FLUSH_TRANSACTION_BUFFER_CB, "transaction buffer flush cb");
|
|
|
|
worker_register_job_custom_metric(RRDENG_OPCODES_WAITING, "opcodes waiting", "opcodes", WORKER_METRIC_ABSOLUTE);
|
|
worker_register_job_custom_metric(RRDENG_WORKS_DISPATCHED, "works dispatched", "works", WORKER_METRIC_ABSOLUTE);
|
|
worker_register_job_custom_metric(RRDENG_WORKS_EXECUTING, "works executing", "works", WORKER_METRIC_ABSOLUTE);
|
|
|
|
struct rrdeng_main *main = arg;
|
|
enum rrdeng_opcode opcode;
|
|
struct rrdeng_cmd cmd;
|
|
main->tid = gettid_cached();
|
|
|
|
fatal_assert(0 == uv_timer_start(&main->timer, timer_per_sec_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
|
|
fatal_assert(0 == uv_timer_start(&main->retention_timer, retention_timer_cb, TIMER_PERIOD_MS * 60, TIMER_PERIOD_MS * 60));
|
|
|
|
bool shutdown = false;
|
|
while (likely(!shutdown)) {
|
|
worker_is_idle();
|
|
uv_run(&main->loop, UV_RUN_DEFAULT);
|
|
|
|
/* wait for commands */
|
|
do {
|
|
worker_is_busy(RRDENG_OPCODE_MAX);
|
|
cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP);
|
|
opcode = cmd.opcode;
|
|
|
|
worker_is_busy(opcode);
|
|
|
|
switch (opcode) {
|
|
case RRDENG_OPCODE_EXTENT_READ:
|
|
worker_dispatch_extent_read(cmd, false);
|
|
break;
|
|
|
|
case RRDENG_OPCODE_QUERY:
|
|
worker_dispatch_query_prep(cmd, false);
|
|
break;
|
|
|
|
case RRDENG_OPCODE_EXTENT_WRITE: {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
struct page_descr_with_data *base = cmd.data;
|
|
struct completion *completion = cmd.completion; // optional
|
|
work_dispatch(ctx, base, completion, opcode, extent_write_tp_worker, after_extent_write);
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_FLUSHED_TO_OPEN: {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
uv_fs_t *uv_fs_request = cmd.data;
|
|
struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
|
|
struct completion *completion = xt_io_descr->completion;
|
|
work_dispatch(ctx, uv_fs_request, completion, opcode, extent_flushed_to_open_tp_worker, after_extent_flushed_to_open);
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_FLUSH_MAIN: {
|
|
if(rrdeng_main.flushes_running < pgc_max_flushers()) {
|
|
rrdeng_main.flushes_running++;
|
|
work_dispatch(NULL, NULL, NULL, opcode, cache_flush_tp_worker, after_do_cache_flush);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_EVICT_MAIN: {
|
|
if(rrdeng_main.evict_main_running < pgc_max_evictors()) {
|
|
rrdeng_main.evict_main_running++;
|
|
work_dispatch(NULL, NULL, NULL, opcode, cache_evict_main_tp_worker, after_do_main_cache_evict);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_EVICT_OPEN: {
|
|
if(rrdeng_main.evict_open_running < pgc_max_evictors()) {
|
|
rrdeng_main.evict_open_running++;
|
|
work_dispatch(NULL, NULL, NULL, opcode, cache_evict_open_tp_worker, after_do_open_cache_evict);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_EVICT_EXTENT: {
|
|
if(rrdeng_main.evict_extent_running < pgc_max_evictors()) {
|
|
rrdeng_main.evict_extent_running++;
|
|
work_dispatch(NULL, NULL, NULL, opcode, cache_evict_extent_tp_worker, after_do_extent_cache_evict);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_CLEANUP: {
|
|
if(!rrdeng_main.cleanup_running) {
|
|
rrdeng_main.cleanup_running++;
|
|
work_dispatch(NULL, NULL, NULL, opcode, cleanup_tp_worker, after_cleanup);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_JOURNAL_INDEX: {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
struct rrdengine_datafile *datafile = cmd.data;
|
|
if(!__atomic_load_n(&ctx->atomic.migration_to_v2_running, __ATOMIC_RELAXED)) {
|
|
|
|
__atomic_store_n(&ctx->atomic.migration_to_v2_running, true, __ATOMIC_RELAXED);
|
|
work_dispatch(ctx, datafile, NULL, opcode, journal_v2_indexing_tp_worker, after_journal_v2_indexing);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_DATABASE_ROTATE: {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) &&
|
|
ctx->datafiles.first->next != NULL &&
|
|
ctx->datafiles.first->next->next != NULL &&
|
|
rrdeng_ctx_tier_cap_exceeded(ctx)) {
|
|
|
|
__atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED);
|
|
work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_CTX_POPULATE_MRG: {
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
struct completion *completion = cmd.completion;
|
|
work_dispatch(ctx, NULL, completion, opcode, populate_mrg_tp_worker, after_populate_mrg);
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_CTX_QUIESCE: {
|
|
// a ctx will shutdown shortly
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
__atomic_store_n(&ctx->quiesce.enabled, true, __ATOMIC_RELEASE);
|
|
work_dispatch(ctx, NULL, NULL, opcode,
|
|
flush_all_hot_and_dirty_pages_of_section_tp_worker,
|
|
after_flush_all_hot_and_dirty_pages_of_section);
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_CTX_SHUTDOWN: {
|
|
// a ctx is shutting down
|
|
struct rrdengine_instance *ctx = cmd.ctx;
|
|
struct completion *completion = cmd.completion;
|
|
work_dispatch(ctx, NULL, completion, opcode, ctx_shutdown_tp_worker, after_ctx_shutdown);
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_SHUTDOWN_EVLOOP: {
|
|
uv_close((uv_handle_t *)&main->async, NULL);
|
|
|
|
(void) uv_timer_stop(&main->timer);
|
|
uv_close((uv_handle_t *)&main->timer, NULL);
|
|
|
|
(void) uv_timer_stop(&main->retention_timer);
|
|
uv_close((uv_handle_t *)&main->retention_timer, NULL);
|
|
shutdown = true;
|
|
break;
|
|
}
|
|
|
|
case RRDENG_OPCODE_NOOP: {
|
|
/* the command queue was empty, do nothing */
|
|
break;
|
|
}
|
|
|
|
// not opcodes
|
|
case RRDENG_OPCODE_MAX:
|
|
default: {
|
|
internal_fatal(true, "DBENGINE: unknown opcode");
|
|
break;
|
|
}
|
|
}
|
|
|
|
} while (opcode != RRDENG_OPCODE_NOOP);
|
|
}
|
|
|
|
nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread");
|
|
(void) uv_loop_close(&main->loop);
|
|
worker_unregister();
|
|
}
|