0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-28 06:32:30 +00:00

Migrate metadata log to SQLite ()

This commit is contained in:
Stelios Fragkakis 2020-11-24 20:00:02 +02:00 committed by GitHub
parent ab3b4c6ff8
commit e9d59e37d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 244115 additions and 2603 deletions

View file

@ -597,6 +597,10 @@ set(RRD_PLUGIN_FILES
database/rrdsetvar.h
database/rrdvar.c
database/rrdvar.h
database/sqlite/sqlite_functions.c
database/sqlite/sqlite_functions.h
database/sqlite/sqlite3.c
database/sqlite/sqlite3.h
database/engine/rrdengine.c
database/engine/rrdengine.h
database/engine/rrddiskprotocol.h
@ -612,7 +616,6 @@ set(RRD_PLUGIN_FILES
database/engine/pagecache.h
database/engine/rrdenglocking.c
database/engine/rrdenglocking.h
database/engine/metadata_log/metadatalog.c
database/engine/metadata_log/metadatalog.h
database/engine/metadata_log/metadatalogapi.c
database/engine/metadata_log/metadatalogapi.h
@ -623,8 +626,6 @@ set(RRD_PLUGIN_FILES
database/engine/metadata_log/metalogpluginsd.h
database/engine/metadata_log/compaction.c
database/engine/metadata_log/compaction.h
database/engine/global_uuid_map/global_uuid_map.c
database/engine/global_uuid_map/global_uuid_map.h
)
set(WEB_PLUGIN_FILES
@ -1293,6 +1294,7 @@ endif()
-Wl,--wrap=web_client_api_request_v1
-Wl,--wrap=rrdhost_find_by_guid
-Wl,--wrap=rrdset_find_byname
-Wl,--wrap=sql_create_host_by_uuid
-Wl,--wrap=rrdset_find
-Wl,--wrap=rrdpush_receiver_thread_spawn
-Wl,--wrap=debug_int
@ -1317,6 +1319,7 @@ endif()
-Wl,--wrap=web_client_api_request_v1
-Wl,--wrap=rrdhost_find_by_guid
-Wl,--wrap=rrdset_find_byname
-Wl,--wrap=sql_create_host_by_uuid
-Wl,--wrap=rrdset_find
-Wl,--wrap=rrdpush_receiver_thread_spawn
-Wl,--wrap=debug_int

View file

@ -377,6 +377,10 @@ RRD_PLUGIN_FILES = \
if ENABLE_DBENGINE
RRD_PLUGIN_FILES += \
database/sqlite/sqlite_functions.c \
database/sqlite/sqlite_functions.h \
database/sqlite/sqlite3.c \
database/sqlite/sqlite3.h \
database/engine/rrdengine.c \
database/engine/rrdengine.h \
database/engine/rrddiskprotocol.h \
@ -392,7 +396,6 @@ if ENABLE_DBENGINE
database/engine/pagecache.h \
database/engine/rrdenglocking.c \
database/engine/rrdenglocking.h \
database/engine/metadata_log/metadatalog.c \
database/engine/metadata_log/metadatalog.h \
database/engine/metadata_log/metadatalogapi.c \
database/engine/metadata_log/metadatalogapi.h \
@ -403,8 +406,6 @@ if ENABLE_DBENGINE
database/engine/metadata_log/metalogpluginsd.h \
database/engine/metadata_log/compaction.c \
database/engine/metadata_log/compaction.h \
database/engine/global_uuid_map/global_uuid_map.c \
database/engine/global_uuid_map/global_uuid_map.h \
$(NULL)
endif

View file

@ -274,7 +274,7 @@ PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_actio
PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!host)) {
if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host.");
return PARSER_RC_OK;
}
@ -303,7 +303,10 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act
// make sure we have the required variables
if (unlikely((!type || !*type || !id || !*id))) {
error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname);
if (likely(host))
error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname);
else
error("requested a CHART, without a type.id. Disabling it.");
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
@ -375,7 +378,7 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!host)) {
if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host.");
return PARSER_RC_OK;
}
@ -387,7 +390,7 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins
goto disable;
}
if (unlikely(!st)) {
if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname);
goto disable;
}
@ -409,7 +412,7 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins
if (unlikely(!algorithm || !*algorithm))
algorithm = "absolute";
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",

View file

@ -16,6 +16,8 @@ typedef struct parser_user_object {
struct label *new_labels;
size_t count;
int enabled;
uint8_t st_exists;
uint8_t host_exists;
void *private; // the user can set this for private use
} PARSER_USER_OBJECT;

View file

@ -1464,8 +1464,6 @@ static inline RRDSET *statsd_private_rrdset_create(
, chart_type // chart type
, memory_mode // memory mode
, history // history
, 0 // not archived
, NULL // no known UUID
);
rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
@ -2004,8 +2002,6 @@ static inline void statsd_update_app_chart(STATSD_APP *app, STATSD_APP_CHART *ch
, chart->chart_type // chart type
, app->rrd_memory_mode // memory mode
, app->rrd_history_entries // history
, 0 // not archived
, NULL // no known UUID
);
rrdset_flag_set(chart->st, RRDSET_FLAG_STORE_FIRST);

View file

@ -1549,7 +1549,6 @@ AC_CONFIG_FILES([
database/Makefile
database/engine/Makefile
database/engine/metadata_log/Makefile
database/engine/global_uuid_map/Makefile
diagrams/Makefile
exporting/Makefile
exporting/graphite/Makefile

View file

@ -73,10 +73,6 @@
// netdata agent spawn server
#include "spawn/spawn.h"
#ifdef ENABLE_DBENGINE
#include "database/engine/global_uuid_map/global_uuid_map.h"
#endif
// the netdata deamon
#include "daemon.h"
#include "main.h"

View file

@ -60,9 +60,6 @@ void netdata_cleanup_and_exit(int ret) {
#ifdef ENABLE_HTTPS
security_clean_openssl();
#endif
#ifdef ENABLE_DBENGINE
free_global_guid_map();
#endif
info("EXIT: all done - netdata is now exiting - bye bye...");
exit(ret);
@ -1451,9 +1448,6 @@ int main(int argc, char **argv) {
struct rrdhost_system_info *system_info = calloc(1, sizeof(struct rrdhost_system_info));
get_system_info(system_info);
#ifdef ENABLE_DBENGINE
init_global_guid_map();
#endif
if(rrd_init(netdata_configured_hostname, system_info))
fatal("Cannot initialize localhost instance with name '%s'.", netdata_configured_hostname);
@ -1471,10 +1465,6 @@ int main(int argc, char **argv) {
// Load host labels
reload_host_labels();
#ifdef ENABLE_DBENGINE
if (localhost->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
metalog_commit_update_host(localhost);
#endif
// ------------------------------------------------------------------------
// spawn the threads

View file

@ -5,7 +5,6 @@ MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
SUBDIRS = \
metadata_log \
global_uuid_map \
$(NULL)
dist_noinst_DATA = \

View file

@ -1,8 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
dist_noinst_DATA = \
README.md \
$(NULL)

View file

@ -1,292 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "global_uuid_map.h"
static Pvoid_t JGUID_map = (Pvoid_t) NULL;
static Pvoid_t JGUID_object_map = (Pvoid_t) NULL;
static uv_rwlock_t guid_lock;
static uv_rwlock_t object_lock;
static uv_rwlock_t global_lock;
void free_global_guid_map()
{
JudyHSFreeArray(&JGUID_map, PJE0);
JudyHSFreeArray(&JGUID_object_map, PJE0);
}
static void free_single_uuid(uuid_t *uuid)
{
Pvoid_t *PValue, *PValue1;
char *existing_object;
Word_t size;
PValue = JudyHSGet(JGUID_map, (void *) uuid, (Word_t) sizeof(uuid_t));
if (likely(PValue)) {
existing_object = *PValue;
GUID_TYPE object_type = existing_object[0];
size = (Word_t)object_type ? (object_type * 16) + 1 : strlen((char *)existing_object + 1) + 2;
PValue1 = JudyHSGet(JGUID_object_map, (void *)existing_object, (Word_t)size);
if (PValue1 && *PValue1) {
freez(*PValue1);
}
JudyHSDel(&JGUID_object_map, (void *)existing_object,
(Word_t)object_type ? (object_type * 16) + 1 : strlen((char *)existing_object + 1) + 2, PJE0);
JudyHSDel(&JGUID_map, (void *)uuid, (Word_t)sizeof(uuid_t), PJE0);
freez(existing_object);
}
}
void free_uuid(uuid_t *uuid)
{
GUID_TYPE ret;
char object[49];
ret = find_object_by_guid(uuid, object, sizeof(object));
if (GUID_TYPE_DIMENSION == ret)
free_single_uuid((uuid_t *)(object + 16 + 16));
if (GUID_TYPE_CHART == ret)
free_single_uuid((uuid_t *)(object + 16));
free_single_uuid(uuid);
return;
}
void dump_object(uuid_t *index, void *object)
{
char uuid_s[36 + 1];
uuid_unparse_lower(*index, uuid_s);
char local_object[3 * 36 + 2 + 1];
switch (*(char *) object) {
case GUID_TYPE_CHAR:
debug(D_GUIDLOG, "OBJECT GUID %s on [%s]", uuid_s, (char *)object + 1);
break;
case GUID_TYPE_CHART:
uuid_unparse_lower((const unsigned char *)object + 1, local_object);
uuid_unparse_lower((const unsigned char *)object + 17, local_object+37);
local_object[36] = ':';
local_object[74] = '\0';
debug(D_GUIDLOG, "CHART GUID %s on [%s]", uuid_s, local_object);
break;
case GUID_TYPE_DIMENSION:
uuid_unparse_lower((const unsigned char *)object + 1, local_object);
uuid_unparse_lower((const unsigned char *)object + 17, local_object + 37);
uuid_unparse_lower((const unsigned char *)object + 33, local_object + 74);
local_object[36] = ':';
local_object[73] = ':';
local_object[110] = '\0';
debug(D_GUIDLOG, "DIM GUID %s on [%s]", uuid_s, local_object);
break;
default:
debug(D_GUIDLOG, "Unknown object");
}
}
/* Returns 0 if it successfully stores the uuid-object mapping or if an identical mapping already exists */
static inline int guid_store_nolock(uuid_t *uuid, void *object, GUID_TYPE object_type)
{
char *existing_object;
GUID_TYPE existing_object_type;
if (unlikely(!object) || uuid == NULL)
return 0;
Pvoid_t *PValue;
PValue = JudyHSIns(&JGUID_map, (void *) uuid, (Word_t) sizeof(uuid_t), PJE0);
if (PPJERR == PValue)
fatal("JudyHSIns() fatal error.");
if (*PValue) {
existing_object = *PValue;
existing_object_type = existing_object[0];
if (existing_object_type != object_type)
return 1;
switch (existing_object_type) {
case GUID_TYPE_DIMENSION:
if (memcmp(existing_object, object, 1 + 16 + 16 + 16))
return 1;
break;
case GUID_TYPE_CHART:
if (memcmp(existing_object, object, 1 + 16 + 16))
return 1;
break;
case GUID_TYPE_CHAR:
if (strcmp(existing_object + 1, (char *)object))
return 1;
break;
default:
return 1;
}
freez(existing_object);
}
*PValue = (Pvoid_t *) object;
PValue = JudyHSIns(&JGUID_object_map, (void *)object, (Word_t) object_type?(object_type * 16)+1:strlen((char *) object+1)+2, PJE0);
if (PPJERR == PValue)
fatal("JudyHSIns() fatal error.");
if (*PValue == NULL) {
uuid_t *value = (uuid_t *) mallocz(sizeof(uuid_t));
uuid_copy(*value, *uuid);
*PValue = value;
}
#ifdef NETDATA_INTERNAL_CHECKS
static uint32_t count = 0;
count++;
char uuid_s[36 + 1];
uuid_unparse_lower(*uuid, uuid_s);
debug(D_GUIDLOG,"GUID added item %" PRIu32" [%s] as:", count, uuid_s);
dump_object(uuid, object);
#endif
return 0;
}
/*
* Given a GUID, find if an object is stored
* - Optionally return the object
*/
GUID_TYPE find_object_by_guid(uuid_t *uuid, char *object, size_t max_bytes)
{
Pvoid_t *PValue;
GUID_TYPE value_type;
uv_rwlock_rdlock(&global_lock);
PValue = JudyHSGet(JGUID_map, (void *) uuid, (Word_t) sizeof(uuid_t));
if (unlikely(!PValue)) {
uv_rwlock_rdunlock(&global_lock);
return GUID_TYPE_NOTFOUND;
}
value_type = *(char *) *PValue;
if (likely(object && max_bytes)) {
switch (value_type) {
case GUID_TYPE_CHAR:
if (unlikely(max_bytes - 1 < strlen((char *) *PValue+1))) {
uv_rwlock_rdunlock(&global_lock);
return GUID_TYPE_NOSPACE;
}
strncpyz(object, (char *) *PValue+1, max_bytes - 1);
break;
case GUID_TYPE_HOST:
case GUID_TYPE_CHART:
case GUID_TYPE_DIMENSION:
if (unlikely(max_bytes < (size_t) value_type * 16)) {
uv_rwlock_rdunlock(&global_lock);
return GUID_TYPE_NOSPACE;
}
memcpy(object, *PValue+1, value_type * 16);
break;
default:
uv_rwlock_rdunlock(&global_lock);
return GUID_TYPE_NOTFOUND;
}
}
#ifdef NETDATA_INTERNAL_CHECKS
dump_object(uuid, *PValue);
#endif
uv_rwlock_rdunlock(&global_lock);
return value_type;
}
/*
* Find a GUID of an object
* - Optionally return the GUID
*
*/
int find_guid_by_object(char *object, uuid_t *uuid, GUID_TYPE object_type)
{
Pvoid_t *PValue;
uv_rwlock_rdlock(&global_lock);
PValue = JudyHSGet(JGUID_object_map, (void *)object, (Word_t)object_type?object_type*16+1:strlen(object+1)+2);
if (unlikely(!PValue)) {
uv_rwlock_rdunlock(&global_lock);
return 1;
}
if (likely(uuid))
uuid_copy(*uuid, *PValue);
uv_rwlock_rdunlock(&global_lock);
return 0;
}
int find_or_generate_guid(void *object, uuid_t *uuid, GUID_TYPE object_type, int replace_instead_of_generate)
{
char *target_object;
uuid_t temp_uuid;
int rc;
switch (object_type) {
case GUID_TYPE_DIMENSION:
if (unlikely(find_or_generate_guid((void *) ((RRDDIM *)object)->id, &temp_uuid, GUID_TYPE_CHAR, 0)))
return 1;
target_object = mallocz(49);
target_object[0] = object_type;
memcpy(target_object + 1, ((RRDDIM *)object)->rrdset->rrdhost->host_uuid, 16);
memcpy(target_object + 17, ((RRDDIM *)object)->rrdset->chart_uuid, 16);
memcpy(target_object + 33, temp_uuid, 16);
break;
case GUID_TYPE_CHART:
if (unlikely(find_or_generate_guid((void *) ((RRDSET *)object)->id, &temp_uuid, GUID_TYPE_CHAR, 0)))
return 1;
target_object = mallocz(33);
target_object[0] = object_type;
memcpy(target_object + 1, (((RRDSET *)object))->rrdhost->host_uuid, 16);
memcpy(target_object + 17, temp_uuid, 16);
break;
case GUID_TYPE_HOST:
target_object = mallocz(17);
target_object[0] = object_type;
memcpy(target_object + 1, (((RRDHOST *)object))->host_uuid, 16);
break;
case GUID_TYPE_CHAR:
target_object = mallocz(strlen((char *) object)+2);
target_object[0] = object_type;
strcpy(target_object+1, (char *) object);
break;
default:
return 1;
}
rc = find_guid_by_object(target_object, uuid, object_type);
if (rc) {
if (!replace_instead_of_generate) /* else take *uuid as user input */
uuid_generate(*uuid);
uv_rwlock_wrlock(&global_lock);
rc = guid_store_nolock(uuid, target_object, object_type);
uv_rwlock_wrunlock(&global_lock);
if (rc)
freez(target_object);
return rc;
}
#ifdef NETDATA_INTERNAL_CHECKS
dump_object(uuid, target_object);
#endif
freez(target_object);
return 0;
}
void init_global_guid_map()
{
static int init = 0;
if (init)
return;
init = 1;
info("Configuring locking mechanism for global GUID map");
fatal_assert(0 == uv_rwlock_init(&guid_lock));
fatal_assert(0 == uv_rwlock_init(&object_lock));
fatal_assert(0 == uv_rwlock_init(&global_lock));
return;
}

View file

@ -1,25 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_GLOBAL_UUID_MAP_H
#define NETDATA_GLOBAL_UUID_MAP_H
#include "libnetdata/libnetdata.h"
#include <Judy.h>
#include "../../rrd.h"
typedef enum guid_type {
GUID_TYPE_CHAR,
GUID_TYPE_HOST,
GUID_TYPE_CHART,
GUID_TYPE_DIMENSION,
GUID_TYPE_NOTFOUND,
GUID_TYPE_NOSPACE
} GUID_TYPE;
extern GUID_TYPE find_object_by_guid(uuid_t *uuid, char *object, size_t max_bytes);
extern int find_guid_by_object(char *object, uuid_t *uuid, GUID_TYPE);
extern void init_global_guid_map();
extern int find_or_generate_guid(void *object, uuid_t *uuid, GUID_TYPE object_type, int replace_instead_of_generate);
extern void free_uuid(uuid_t *uuid);
extern void free_global_guid_map();
#endif //NETDATA_GLOBAL_UUID_MAP_H

View file

@ -3,319 +3,6 @@
#include "metadatalog.h"
void after_compact_old_records(struct metalog_worker_config* wc)
{
struct metalog_instance *ctx = wc->ctx;
int error;
mlf_flush_records_buffer(wc, &ctx->compaction_state.records_log, &ctx->compaction_state.new_metadata_logfiles);
uv_run(wc->loop, UV_RUN_DEFAULT);
error = uv_thread_join(wc->now_compacting_files);
if (error) {
error("uv_thread_join(): %s", uv_strerror(error));
}
freez(wc->now_compacting_files);
/* unfreeze command processing */
wc->now_compacting_files = NULL;
wc->cleanup_thread_compacting_files = 0;
/* interrupt event loop */
uv_stop(wc->loop);
info("Finished metadata log compaction (id:%"PRIu32").", ctx->current_compaction_id);
}
static void metalog_flush_compaction_records(struct metalog_instance *ctx)
{
struct metalog_cmd cmd;
struct completion compaction_completion;
init_completion(&compaction_completion);
cmd.opcode = METALOG_COMPACTION_FLUSH;
cmd.record_io_descr.completion = &compaction_completion;
metalog_enq_cmd(&ctx->worker_config, &cmd);
wait_for_completion(&compaction_completion);
destroy_completion(&compaction_completion);
}
/* The caller must have called metalog_flush_compaction_records() before to synchronize and quiesce the event loop. */
static void compaction_test_quota(struct metalog_worker_config *wc)
{
struct metalog_instance *ctx = wc->ctx;
struct logfile_compaction_state *compaction_state;
struct metadata_logfile *oldmetalogfile, *newmetalogfile;
unsigned current_size;
int ret;
compaction_state = &ctx->compaction_state;
newmetalogfile = compaction_state->new_metadata_logfiles.last;
oldmetalogfile = ctx->metadata_logfiles.first;
current_size = newmetalogfile->pos;
if (unlikely(current_size >= MAX_METALOGFILE_SIZE && newmetalogfile->starting_fileno < oldmetalogfile->fileno)) {
/* It's safe to finalize the compacted metadata log file and create a new one since it has already replaced
* an older one. */
/* Finalize as the immediately previous file than the currently compacted one. */
ret = rename_metadata_logfile(newmetalogfile, 0, newmetalogfile->fileno - 1);
if (ret < 0)
return;
ret = add_new_metadata_logfile(ctx, &compaction_state->new_metadata_logfiles,
ctx->metadata_logfiles.first->fileno, ctx->metadata_logfiles.first->fileno);
if (likely(!ret)) {
compaction_state->fileno = ctx->metadata_logfiles.first->fileno;
}
}
}
static void compact_record_by_uuid(struct metalog_instance *ctx, uuid_t *uuid)
{
GUID_TYPE ret;
RRDSET *st;
RRDDIM *rd;
BUFFER *buffer;
RRDHOST *host = NULL;
ret = find_object_by_guid(uuid, NULL, 0);
switch (ret) {
case GUID_TYPE_CHAR:
error_with_guid(uuid, "Ignoring unexpected type GUID_TYPE_CHAR");
break;
case GUID_TYPE_CHART:
st = metalog_get_chart_from_uuid(ctx, uuid);
if (st) {
if (ctx->current_compaction_id > st->rrdhost->compaction_id) {
error("Forcing compaction of HOST %s from CHART %s", st->rrdhost->hostname, st->id);
compact_record_by_uuid(ctx, &st->rrdhost->host_uuid);
}
if (ctx->current_compaction_id > st->compaction_id) {
st->compaction_id = ctx->current_compaction_id;
buffer = metalog_update_chart_buffer(st, ctx->current_compaction_id);
metalog_commit_record(ctx, buffer, METALOG_COMMIT_CREATION_RECORD, uuid, 1);
} else {
debug(D_METADATALOG, "Chart has already been compacted, ignoring record.");
}
} else {
debug(D_METADATALOG, "Ignoring nonexistent chart metadata record.");
}
break;
case GUID_TYPE_DIMENSION:
rd = metalog_get_dimension_from_uuid(ctx, uuid);
if (rd) {
if (ctx->current_compaction_id > rd->rrdset->rrdhost->compaction_id) {
error("Forcing compaction of HOST %s", rd->rrdset->rrdhost->hostname);
compact_record_by_uuid(ctx, &rd->rrdset->rrdhost->host_uuid);
}
if (ctx->current_compaction_id > rd->rrdset->compaction_id) {
error("Forcing compaction of CHART %s", rd->rrdset->id);
compact_record_by_uuid(ctx, rd->rrdset->chart_uuid);
} else if (ctx->current_compaction_id > rd->state->compaction_id) {
rd->state->compaction_id = ctx->current_compaction_id;
buffer = metalog_update_dimension_buffer(rd);
metalog_commit_record(ctx, buffer, METALOG_COMMIT_CREATION_RECORD, uuid, 1);
} else {
debug(D_METADATALOG, "Dimension has already been compacted, ignoring record.");
}
} else {
debug(D_METADATALOG, "Ignoring nonexistent dimension metadata record.");
}
break;
case GUID_TYPE_HOST:
host = metalog_get_host_from_uuid(ctx, uuid);
if (unlikely(!host))
break;
if (ctx->current_compaction_id > host->compaction_id) {
host->compaction_id = ctx->current_compaction_id;
buffer = metalog_update_host_buffer(host);
metalog_commit_record(ctx, buffer, METALOG_COMMIT_CREATION_RECORD, uuid, 1);
} else {
debug(D_METADATALOG, "Host has already been compacted, ignoring record.");
}
break;
case GUID_TYPE_NOTFOUND:
debug(D_METADATALOG, "Ignoring nonexistent metadata record.");
break;
case GUID_TYPE_NOSPACE:
error_with_guid(uuid, "Not enough space for object retrieval");
break;
default:
error("Unknown return code %u from find_object_by_guid", ret);
break;
}
}
/* Returns 0 on success. */
static int compact_metadata_logfile_records(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
{
struct metalog_worker_config* wc = &ctx->worker_config;
struct logfile_compaction_state *compaction_state;
struct metalog_record *record;
struct metalog_record_block *record_block, *prev_record_block;
int ret;
unsigned iterated_records;
#define METADATA_LOG_RECORD_BATCH 128 /* Flush I/O and check sizes whenever this many records have been iterated */
info("Compacting metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
compaction_state = &ctx->compaction_state;
record_block = prev_record_block = NULL;
iterated_records = 0;
for (record = mlf_record_get_first(metalogfile) ; record != NULL ; record = mlf_record_get_next(metalogfile)) {
if ((record_block = metalogfile->records.iterator.current) != prev_record_block) {
if (prev_record_block) { /* Deallocate iterated record blocks */
rrd_atomic_fetch_add(&ctx->records_nr, -prev_record_block->records_nr);
freez(prev_record_block);
}
prev_record_block = record_block;
}
compact_record_by_uuid(ctx, &record->uuid);
if (0 == ++iterated_records % METADATA_LOG_RECORD_BATCH) {
metalog_flush_compaction_records(ctx);
if (compaction_state->throttle) {
(void)sleep_usec(10000); /* 10 msec throttle compaction */
}
compaction_test_quota(wc);
}
}
if (prev_record_block) { /* Deallocate iterated record blocks */
rrd_atomic_fetch_add(&ctx->records_nr, -prev_record_block->records_nr);
freez(prev_record_block);
}
info("Compacted metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
metadata_logfile_list_delete(&ctx->metadata_logfiles, metalogfile);
ret = destroy_metadata_logfile(metalogfile);
if (!ret) {
info("Deleted file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
rrd_atomic_fetch_add(&ctx->disk_space, -metalogfile->pos);
} else {
error("Failed to delete file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
}
freez(metalogfile);
return ret;
}
static void compact_old_records(void *arg)
{
struct metalog_instance *ctx = arg;
struct metalog_worker_config* wc = &ctx->worker_config;
struct logfile_compaction_state *compaction_state;
struct metadata_logfile *metalogfile, *nextmetalogfile, *newmetalogfile;
int ret;
compaction_state = &ctx->compaction_state;
nextmetalogfile = NULL;
for (metalogfile = ctx->metadata_logfiles.first ;
metalogfile != compaction_state->last_original_logfile ;
metalogfile = nextmetalogfile) {
nextmetalogfile = metalogfile->next;
newmetalogfile = compaction_state->new_metadata_logfiles.last;
ret = rename_metadata_logfile(newmetalogfile, newmetalogfile->starting_fileno, metalogfile->fileno);
if (ret < 0) {
error("Failed to rename file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, newmetalogfile->starting_fileno, newmetalogfile->fileno);
}
ret = compact_metadata_logfile_records(ctx, metalogfile);
if (ret) {
error("Metadata log compaction failed, cancelling.");
break;
}
}
fatal_assert(nextmetalogfile); /* There are always more than 1 metadata log files during compaction */
newmetalogfile = compaction_state->new_metadata_logfiles.last;
if (newmetalogfile->starting_fileno != 0) { /* Must rename the last compacted file */
ret = rename_metadata_logfile(newmetalogfile, 0, nextmetalogfile->fileno - 1);
if (ret < 0) {
error("Failed to rename file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL METALOG_EXTENSION"\".",
ctx->rrdeng_ctx->dbfiles_path, newmetalogfile->starting_fileno, newmetalogfile->fileno);
}
}
/* Connect the compacted files to the metadata log */
newmetalogfile->next = nextmetalogfile;
ctx->metadata_logfiles.first = compaction_state->new_metadata_logfiles.first;
wc->cleanup_thread_compacting_files = 1;
/* wake up event loop */
fatal_assert(0 == uv_async_send(&wc->async));
}
/* Returns 0 on success. */
static int init_compaction_state(struct metalog_instance *ctx)
{
struct metadata_logfile *newmetalogfile;
struct logfile_compaction_state *compaction_state;
int ret;
compaction_state = &ctx->compaction_state;
compaction_state->new_metadata_logfiles.first = NULL;
compaction_state->new_metadata_logfiles.last = NULL;
compaction_state->starting_fileno = ctx->metadata_logfiles.first->fileno;
compaction_state->fileno = ctx->metadata_logfiles.first->fileno;
compaction_state->last_original_logfile = ctx->metadata_logfiles.last;
compaction_state->throttle = 0;
ret = add_new_metadata_logfile(ctx, &compaction_state->new_metadata_logfiles, compaction_state->starting_fileno,
compaction_state->fileno);
if (unlikely(ret)) {
error("Cannot create new metadata log files, compaction aborted.");
return ret;
}
newmetalogfile = compaction_state->new_metadata_logfiles.first;
fatal_assert(newmetalogfile == compaction_state->new_metadata_logfiles.last);
init_metadata_record_log(&compaction_state->records_log);
return 0;
}
void metalog_do_compaction(struct metalog_worker_config *wc)
{
struct metalog_instance *ctx = wc->ctx;
int error;
if (wc->now_compacting_files) {
/* already compacting metadata log files */
return;
}
wc->now_compacting_files = mallocz(sizeof(*wc->now_compacting_files));
wc->cleanup_thread_compacting_files = 0;
metalog_try_link_new_metadata_logfile(wc);
error = init_compaction_state(ctx);
if (unlikely(error)) {
error("Cannot create new metadata log files, compaction aborted.");
return;
}
++ctx->current_compaction_id; /* Signify a new compaction */
info("Starting metadata log compaction (id:%"PRIu32").", ctx->current_compaction_id);
error = uv_thread_create(wc->now_compacting_files, compact_old_records, ctx);
if (error) {
error("uv_thread_create(): %s", uv_strerror(error));
freez(wc->now_compacting_files);
wc->now_compacting_files = NULL;
}
}
/* Return 0 on success. */
int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
unsigned *matched_files)

View file

@ -8,19 +8,7 @@
#endif
#include "../rrdengine.h"
struct logfile_compaction_state {
unsigned fileno; /* Starts at 1 */
unsigned starting_fileno; /* 0 for normal files, staring number during compaction */
struct metadata_record_commit_log records_log;
struct metadata_logfile_list new_metadata_logfiles;
struct metadata_logfile *last_original_logfile; /* Marks the end of compaction */
uint8_t throttle; /* set non-zero to throttle compaction */
};
extern int compaction_failure_recovery(struct metalog_instance *ctx, struct metadata_logfile **metalogfiles,
unsigned *matched_files);
extern void metalog_do_compaction(struct metalog_worker_config *wc);
extern void after_compact_old_records(struct metalog_worker_config* wc);
#endif /* NETDATA_COMPACTION_H */

View file

@ -1,184 +1,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include <database/sqlite/sqlite_functions.h>
#include "metadatalog.h"
#include "metalogpluginsd.h"
static void mlf_record_block_insert(struct metadata_logfile *metalogfile, struct metalog_record_block *record_block)
{
if (likely(NULL != metalogfile->records.last)) {
metalogfile->records.last->next = record_block;
}
if (unlikely(NULL == metalogfile->records.first)) {
metalogfile->records.first = record_block;
}
metalogfile->records.last = record_block;
}
void mlf_record_insert(struct metadata_logfile *metalogfile, struct metalog_record *record)
{
struct metalog_record_block *record_block;
struct metalog_instance *ctx = metalogfile->ctx;
record_block = metalogfile->records.last;
if (likely(NULL != record_block && record_block->records_nr < MAX_METALOG_RECORDS_PER_BLOCK)) {
record_block->record_array[record_block->records_nr++] = *record;
} else { /* Create new record block, the last one filled up */
record_block = mallocz(sizeof(*record_block));
record_block->records_nr = 1;
record_block->record_array[0] = *record;
record_block->next = NULL;
mlf_record_block_insert(metalogfile, record_block);
}
rrd_atomic_fetch_add(&ctx->records_nr, 1);
}
struct metalog_record *mlf_record_get_first(struct metadata_logfile *metalogfile)
{
struct metalog_records *records = &metalogfile->records;
struct metalog_record_block *record_block = metalogfile->records.first;
records->iterator.current = record_block;
records->iterator.record_i = 0;
if (unlikely(NULL == record_block || !record_block->records_nr)) {
error("Cannot iterate empty metadata log file %u-%u.", metalogfile->starting_fileno, metalogfile->fileno);
return NULL;
}
return &record_block->record_array[0];
}
/* Must have called mlf_record_get_first before calling this function. */
struct metalog_record *mlf_record_get_next(struct metadata_logfile *metalogfile)
{
struct metalog_records *records = &metalogfile->records;
struct metalog_record_block *record_block = records->iterator.current;
if (unlikely(NULL == record_block)) {
return NULL;
}
if (++records->iterator.record_i >= record_block->records_nr) {
record_block = record_block->next;
if (unlikely(NULL == record_block || !record_block->records_nr)) {
return NULL;
}
records->iterator.current = record_block;
records->iterator.record_i = 0;
return &record_block->record_array[0];
}
return &record_block->record_array[records->iterator.record_i];
}
static void flush_records_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
struct metalog_worker_config *wc = req->loop->data;
struct metalog_instance *ctx = wc->ctx;
debug(D_METADATALOG, "%s: Metadata log file block was written to disk.", __func__);
if (req->result < 0) {
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_METADATALOG, "%s: Metadata log file block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
free(io_descr->buf);
freez(io_descr);
}
/* Careful to always call this before creating a new metadata log file to finish writing the old one */
void mlf_flush_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
struct metadata_logfile_list *metadata_logfiles)
{
struct metalog_instance *ctx = wc->ctx;
int ret;
struct generic_io_descriptor *io_descr;
unsigned pos, size;
struct metadata_logfile *metalogfile;
if (unlikely(NULL == records_log->buf || 0 == records_log->buf_pos)) {
return;
}
/* care with outstanding records when switching metadata log files */
metalogfile = metadata_logfiles->last;
io_descr = mallocz(sizeof(*io_descr));
pos = records_log->buf_pos;
size = pos; /* no need to align the I/O when doing buffered writes */
io_descr->buf = records_log->buf;
io_descr->bytes = size;
io_descr->pos = metalogfile->pos;
io_descr->req.data = io_descr;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
ret = uv_fs_write(wc->loop, &io_descr->req, metalogfile->file, &io_descr->iov, 1,
metalogfile->pos, flush_records_buffer_cb);
fatal_assert(-1 != ret);
metalogfile->pos += size;
rrd_atomic_fetch_add(&ctx->disk_space, size);
records_log->buf = NULL;
ctx->stats.io_write_bytes += size;
++ctx->stats.io_write_requests;
}
void *mlf_get_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
struct metadata_logfile_list *metadata_logfiles, unsigned size)
{
int ret;
unsigned buf_pos = 0, buf_size;
fatal_assert(size);
if (records_log->buf) {
unsigned remaining;
buf_pos = records_log->buf_pos;
buf_size = records_log->buf_size;
remaining = buf_size - buf_pos;
if (size > remaining) {
/* we need a new buffer */
mlf_flush_records_buffer(wc, records_log, metadata_logfiles);
}
}
if (NULL == records_log->buf) {
buf_size = ALIGN_BYTES_CEILING(size);
ret = posix_memalign((void *)&records_log->buf, RRDFILE_ALIGNMENT, buf_size);
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
buf_pos = records_log->buf_pos = 0;
records_log->buf_size = buf_size;
}
records_log->buf_pos += size;
return records_log->buf + buf_pos;
}
void metadata_logfile_list_insert(struct metadata_logfile_list *metadata_logfiles, struct metadata_logfile *metalogfile)
{
if (likely(NULL != metadata_logfiles->last)) {
metadata_logfiles->last->next = metalogfile;
}
if (unlikely(NULL == metadata_logfiles->first)) {
metadata_logfiles->first = metalogfile;
}
metadata_logfiles->last = metalogfile;
}
void metadata_logfile_list_delete(struct metadata_logfile_list *metadata_logfiles, struct metadata_logfile *metalogfile)
{
struct metadata_logfile *next;
next = metalogfile->next;
fatal_assert((NULL != next) && (metadata_logfiles->first == metalogfile) &&
(metadata_logfiles->last != metalogfile));
metadata_logfiles->first = next;
}
void generate_metadata_logfile_path(struct metadata_logfile *metalogfile, char *str, size_t maxlen)
{
@ -193,14 +17,13 @@ void metadata_logfile_init(struct metadata_logfile *metalogfile, struct metalog_
metalogfile->fileno = fileno;
metalogfile->file = (uv_file)0;
metalogfile->pos = 0;
metalogfile->records.first = metalogfile->records.last = NULL;
metalogfile->next = NULL;
metalogfile->ctx = ctx;
}
int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno, unsigned new_fileno)
{
struct metalog_instance *ctx = metalogfile->ctx;
//struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
int ret;
char oldpath[RRDENG_PATH_MAX], newpath[RRDENG_PATH_MAX];
@ -217,7 +40,7 @@ int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_s
ret = uv_fs_rename(NULL, &req, oldpath, newpath, NULL);
if (ret < 0) {
error("uv_fs_rename(%s): %s", oldpath, uv_strerror(ret));
++ctx->stats.fs_errors; /* this is racy, may miss some errors */
//++ctx->stats.fs_errors; /* this is racy, may miss some errors */
rrd_stat_atomic_add(&global_fs_errors, 1);
/* restore previous values */
metalogfile->starting_fileno = backup_starting_fileno;
@ -228,49 +51,9 @@ int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_s
return ret;
}
int close_metadata_logfile(struct metadata_logfile *metalogfile)
{
struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
ret = uv_fs_close(NULL, &req, metalogfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
}
int fsync_metadata_logfile(struct metadata_logfile *metalogfile)
{
struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
ret = uv_fs_fsync(NULL, &req, metalogfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
}
int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
{
struct metalog_instance *ctx = metalogfile->ctx;
//struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
@ -280,7 +63,7 @@ int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
// ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
@ -288,104 +71,6 @@ int unlink_metadata_logfile(struct metadata_logfile *metalogfile)
return ret;
}
int destroy_metadata_logfile(struct metadata_logfile *metalogfile)
{
struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, metalogfile->file, 0, NULL);
if (ret < 0) {
error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, metalogfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
// ++ctx->stats.metadata_logfile_deletions;
return ret;
}
int create_metadata_logfile(struct metadata_logfile *metalogfile)
{
struct metalog_instance *ctx = metalogfile->ctx;
uv_fs_t req;
uv_file file;
int ret, fd;
struct rrdeng_metalog_sb *superblock;
uv_buf_t iov;
char path[RRDENG_PATH_MAX];
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
fd = open_file_buffered_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
metalogfile->file = file;
// ++ctx->stats.metadata_logfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_METALOG_MAGIC, RRDENG_MAGIC_SZ);
superblock->version = RRDENG_METALOG_VER;
iov = uv_buf_init((void *)superblock, sizeof(*superblock));
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_fsync(NULL, &req, metalogfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
if (ret < 0) {
destroy_metadata_logfile(metalogfile);
return ret;
}
metalogfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
return 0;
}
static int check_metadata_logfile_superblock(uv_file file)
{
int ret;
@ -447,13 +132,13 @@ void replay_record(struct metadata_logfile *metalogfile, struct rrdeng_metalog_r
/* This function only works with buffered I/O */
static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *buf, size_t len, uint64_t offset)
{
struct metalog_instance *ctx;
// struct metalog_instance *ctx;
uv_file file;
uv_buf_t iov;
uv_fs_t req;
int ret;
ctx = metalogfile->ctx;
// ctx = metalogfile->ctx;
file = metalogfile->file;
iov = uv_buf_init(buf, len);
ret = uv_fs_read(NULL, &req, file, &iov, 1, offset, NULL);
@ -461,14 +146,14 @@ static inline int metalogfile_read(struct metadata_logfile *metalogfile, void *b
fatal("uv_fs_read: %s", uv_strerror(ret));
}
if (req.result < 0) {
++ctx->stats.io_errors;
// ++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
error("%s: uv_fs_read - %s - record at offset %"PRIu64"(%u) in metadata logfile %u-%u.", __func__,
uv_strerror((int)req.result), offset, (unsigned)len, metalogfile->starting_fileno, metalogfile->fileno);
}
uv_fs_req_cleanup(&req);
ctx->stats.io_read_bytes += len;
++ctx->stats.io_read_requests;
// ctx->stats.io_read_bytes += len;
// ++ctx->stats.io_read_requests;
return ret;
}
@ -561,6 +246,7 @@ static void iterate_records(struct metadata_logfile *metalogfile)
int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *metalogfile)
{
UNUSED(ctx);
uv_fs_t req;
uv_file file;
int ret, fd, error;
@ -568,9 +254,12 @@ int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile
char path[RRDENG_PATH_MAX];
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
if (file_is_migrated(path))
return 0;
fd = open_file_buffered_io(path, O_RDWR, &file);
if (fd < 0) {
++ctx->stats.fs_errors;
// ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
@ -583,15 +272,16 @@ int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile
ret = check_metadata_logfile_superblock(file);
if (ret)
goto error;
ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
++ctx->stats.io_read_requests;
// ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
// ++ctx->stats.io_read_requests;
metalogfile->file = file;
metalogfile->pos = file_size;
iterate_records(metalogfile);
info("Metadata log \"%s\" loaded (size:%"PRIu64").", path, file_size);
info("Metadata log \"%s\" migrated to the database (size:%"PRIu64").", path, file_size);
add_migrated_file(path, file_size);
return 0;
error:
@ -599,20 +289,13 @@ error:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
// ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return error;
}
void init_metadata_record_log(struct metadata_record_commit_log *records_log)
{
records_log->buf = NULL;
records_log->buf_pos = 0;
records_log->record_id = 1;
}
static int scan_metalog_files_cmp(const void *a, const void *b)
{
struct metadata_logfile *file1, *file2;
@ -640,7 +323,7 @@ static int scan_metalog_files(struct metalog_instance *ctx)
fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
++ctx->stats.fs_errors;
// ++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return ret;
}
@ -675,7 +358,7 @@ static int scan_metalog_files(struct metalog_instance *ctx)
freez(metalogfiles);
return UV_EINVAL;
}
ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
//ctx->last_fileno = metalogfiles[matched_files - 1]->fileno;
struct plugind cd = {
.enabled = 1,
@ -722,24 +405,27 @@ static int scan_metalog_files(struct metalog_instance *ctx)
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
metalogfile = metalogfiles[i];
db_lock();
db_execute("BEGIN TRANSACTION;");
ret = load_metadata_logfile(ctx, metalogfile);
if (0 != ret) {
error("Deleting invalid metadata log file \"%s/"METALOG_PREFIX METALOG_FILE_NUMBER_PRINT_TMPL
METALOG_EXTENSION"\"", dbfiles_path, metalogfile->starting_fileno, metalogfile->fileno);
unlink_metadata_logfile(metalogfile);
freez(metalogfile);
++failed_to_load;
continue;
db_execute("ROLLBACK TRANSACTION;");
}
metadata_logfile_list_insert(&ctx->metadata_logfiles, metalogfile);
rrd_atomic_fetch_add(&ctx->disk_space, metalogfile->pos);
else
db_execute("COMMIT TRANSACTION;");
db_unlock();
freez(metalogfile);
}
matched_files -= failed_to_load;
debug(D_METADATALOG, "PARSER ended");
parser_destroy(parser);
size_t count = metalog_parser_object.count;
size_t count __maybe_unused = metalog_parser_object.count;
debug(D_METADATALOG, "Parsing count=%u", (unsigned)count);
after_failed_to_parse:
@ -749,31 +435,6 @@ after_failed_to_parse:
return matched_files;
}
/* Creates a metadata log file */
int add_new_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile_list *logfile_list,
unsigned starting_fileno, unsigned fileno)
{
struct metadata_logfile *metalogfile;
int ret;
char path[RRDENG_PATH_MAX];
info("Creating new metadata log file in path %s", ctx->rrdeng_ctx->dbfiles_path);
metalogfile = mallocz(sizeof(*metalogfile));
metadata_logfile_init(metalogfile, ctx, starting_fileno, fileno);
ret = create_metadata_logfile(metalogfile);
if (!ret) {
generate_metadata_logfile_path(metalogfile, path, sizeof(path));
info("Created metadata log file \"%s\".", path);
} else {
freez(metalogfile);
return ret;
}
metadata_logfile_list_insert(logfile_list, metalogfile);
rrd_atomic_fetch_add(&ctx->disk_space, metalogfile->pos);
return 0;
}
/* Return 0 on success. */
int init_metalog_files(struct metalog_instance *ctx)
{
@ -784,32 +445,9 @@ int init_metalog_files(struct metalog_instance *ctx)
if (ret < 0) {
error("Failed to scan path \"%s\".", dbfiles_path);
return ret;
} else if (0 == ret) {
info("Metadata log files not found, creating in path \"%s\".", dbfiles_path);
ret = add_new_metadata_logfile(ctx, &ctx->metadata_logfiles, 0, 1);
if (ret) {
error("Failed to create metadata log file in path \"%s\".", dbfiles_path);
return ret;
}
}/* else if (0 == ret) {
ctx->last_fileno = 1;
}
}*/
return 0;
}
void finalize_metalog_files(struct metalog_instance *ctx)
{
struct metadata_logfile *metalogfile, *next_metalogfile;
struct metalog_record_block *record_block, *next_record_block;
for (metalogfile = ctx->metadata_logfiles.first ; metalogfile != NULL ; metalogfile = next_metalogfile) {
next_metalogfile = metalogfile->next;
for (record_block = metalogfile->records.first ; record_block != NULL ; record_block = next_record_block) {
next_record_block = record_block->next;
freez(record_block);
}
close_metadata_logfile(metalogfile);
freez(metalogfile);
}
}

View file

@ -13,34 +13,6 @@ struct metalog_worker_config;
#define METALOG_PREFIX "metadatalog-"
#define METALOG_EXTENSION ".mlf"
#define MAX_METALOGFILE_SIZE (524288LU)
/* Deletions are ignored during compaction, so only creation UUIDs are stored */
struct metalog_record {
uuid_t uuid;
};
#define MAX_METALOG_RECORDS_PER_BLOCK (1024LU)
struct metalog_record_block {
uint64_t file_offset;
uint32_t io_size;
struct metalog_record record_array[MAX_METALOG_RECORDS_PER_BLOCK];
uint16_t records_nr;
struct metalog_record_block *next;
};
struct metalog_records {
/* the record block list is sorted based on disk offset */
struct metalog_record_block *first;
struct metalog_record_block *last;
struct {
struct metalog_record_block *current;
uint16_t record_i;
} iterator;
};
/* only one event loop is supported for now */
struct metadata_logfile {
unsigned fileno; /* Starts at 1 */
@ -48,7 +20,6 @@ struct metadata_logfile {
uv_file file;
uint64_t pos;
struct metalog_instance *ctx;
struct metalog_records records;
struct metadata_logfile *next;
};
@ -57,42 +28,12 @@ struct metadata_logfile_list {
struct metadata_logfile *last; /* newest */
};
struct metadata_record_commit_log {
uint64_t record_id;
/* outstanding record buffer */
void *buf;
unsigned buf_pos;
unsigned buf_size;
};
extern void mlf_record_insert(struct metadata_logfile *metalogfile, struct metalog_record *record);
extern struct metalog_record *mlf_record_get_first(struct metadata_logfile *metalogfile);
extern struct metalog_record *mlf_record_get_next(struct metadata_logfile *metalogfile);
extern void mlf_flush_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
struct metadata_logfile_list *metadata_logfiles);
extern void *mlf_get_records_buffer(struct metalog_worker_config *wc, struct metadata_record_commit_log *records_log,
struct metadata_logfile_list *metadata_logfiles, unsigned size);
extern void metadata_logfile_list_insert(struct metadata_logfile_list *metadata_logfiles,
struct metadata_logfile *metalogfile);
extern void metadata_logfile_list_delete(struct metadata_logfile_list *metadata_logfiles,
struct metadata_logfile *metalogfile);
extern void generate_metadata_logfile_path(struct metadata_logfile *metadatalog, char *str, size_t maxlen);
extern void metadata_logfile_init(struct metadata_logfile *metadatalog, struct metalog_instance *ctx,
unsigned tier, unsigned fileno);
extern int rename_metadata_logfile(struct metadata_logfile *metalogfile, unsigned new_starting_fileno,
unsigned new_fileno);
extern int close_metadata_logfile(struct metadata_logfile *metadatalog);
extern int fsync_metadata_logfile(struct metadata_logfile *metalogfile);
extern int unlink_metadata_logfile(struct metadata_logfile *metalogfile);
extern int destroy_metadata_logfile(struct metadata_logfile *metalogfile);
extern int create_metadata_logfile(struct metadata_logfile *metalogfile);
extern int load_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile *logfile);
extern void init_metadata_record_log(struct metadata_record_commit_log *records_log);
extern int add_new_metadata_logfile(struct metalog_instance *ctx, struct metadata_logfile_list *logfile_list,
unsigned tier, unsigned fileno);
extern int init_metalog_files(struct metalog_instance *ctx);
extern void finalize_metalog_files(struct metalog_instance *ctx);
#endif /* NETDATA_LOGFILE_H */

View file

@ -1,427 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "metadatalog.h"
static void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
BUILD_BUG_ON(strlen(RRDENG_METALOG_MAGIC) > RRDENG_MAGIC_SZ);
/* Metadata log file super-block cannot be larger than RRDENG_BLOCK_SIZE */
BUILD_BUG_ON(RRDENG_METALOG_SB_PADDING_SZ < 0);
/* Object duplication factor cannot be less than 1, or too close to 1 */
BUILD_BUG_ON(MAX_DUPLICATION_PERCENTAGE < 110);
}
char *get_metalog_statistics(struct metalog_instance *ctx, char *str, size_t size)
{
snprintfz(str, size,
"io_write_bytes: %ld\n"
"io_write_requests: %ld\n"
"io_read_bytes: %ld\n"
"io_read_requests: %ld\n"
"io_write_record_bytes: %ld\n"
"io_write_records: %ld\n"
"io_read_record_bytes: %ld\n"
"io_read_records: %ld\n"
"metadata_logfile_creations: %ld\n"
"metadata_logfile_deletions: %ld\n"
"io_errors: %ld\n"
"fs_errors: %ld\n",
(long)ctx->stats.io_write_bytes,
(long)ctx->stats.io_write_requests,
(long)ctx->stats.io_read_bytes,
(long)ctx->stats.io_read_requests,
(long)ctx->stats.io_write_record_bytes,
(long)ctx->stats.io_write_records,
(long)ctx->stats.io_read_record_bytes,
(long)ctx->stats.io_read_records,
(long)ctx->stats.metadata_logfile_creations,
(long)ctx->stats.metadata_logfile_deletions,
(long)ctx->stats.io_errors,
(long)ctx->stats.fs_errors
);
return str;
}
/* The buffer must not be empty */
void metalog_commit_record(struct metalog_instance *ctx, BUFFER *buffer, enum metalog_opcode opcode, uuid_t *uuid,
int compacting)
{
struct metalog_cmd cmd;
fatal_assert(buffer_strlen(buffer));
fatal_assert(opcode == METALOG_COMMIT_CREATION_RECORD || opcode == METALOG_COMMIT_DELETION_RECORD);
cmd.opcode = opcode;
cmd.record_io_descr.buffer = buffer;
cmd.record_io_descr.compacting = compacting;
if (!uuid)
uuid_clear(cmd.record_io_descr.uuid);
else
uuid_copy(cmd.record_io_descr.uuid, *uuid);
metalog_enq_cmd(&ctx->worker_config, &cmd);
}
static void commit_record(struct metalog_worker_config* wc, struct metalog_record_io_descr *io_descr, uint8_t type)
{
struct metalog_instance *ctx = wc->ctx;
unsigned payload_length, size_bytes;
void *buf, *mlf_payload;
/* persistent structures */
struct rrdeng_metalog_record_header *mlf_header;
struct rrdeng_metalog_record_trailer *mlf_trailer;
uLong crc;
payload_length = buffer_strlen(io_descr->buffer);
size_bytes = sizeof(*mlf_header) + payload_length + sizeof(*mlf_trailer);
if (io_descr->compacting)
buf = mlf_get_records_buffer(wc, &ctx->compaction_state.records_log,
&ctx->compaction_state.new_metadata_logfiles, size_bytes);
else
buf = mlf_get_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles, size_bytes);
mlf_header = buf;
mlf_header->type = type;
mlf_header->header_length = sizeof(*mlf_header);
mlf_header->payload_length = payload_length;
mlf_payload = buf + sizeof(*mlf_header);
memcpy(mlf_payload, buffer_tostring(io_descr->buffer), payload_length);
mlf_trailer = buf + sizeof(*mlf_header) + payload_length;
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, buf, sizeof(*mlf_header) + payload_length);
crc32set(mlf_trailer->checksum, crc);
buffer_free(io_descr->buffer);
}
static void do_commit_record(struct metalog_worker_config* wc, uint8_t type, void *data)
{
struct metalog_record_io_descr *io_descr = (struct metalog_record_io_descr *)data;
switch (type) {
case METALOG_CREATE_OBJECT:
if (!uuid_is_null(io_descr->uuid)) { /* It's a valid object */
struct metalog_record record;
uuid_copy(record.uuid, io_descr->uuid);
if (io_descr->compacting)
mlf_record_insert(wc->ctx->compaction_state.new_metadata_logfiles.last, &record);
else
mlf_record_insert(wc->ctx->metadata_logfiles.last, &record);
} /* fall through */
case METALOG_DELETE_OBJECT:
commit_record(wc, (struct metalog_record_io_descr *)data, type);
break;
default:
fatal("Unknown metadata log file record type, possible memory corruption.");
break;
}
}
/* Only creates a new metadata file and links it to the metadata log if the last one is non empty. */
void metalog_try_link_new_metadata_logfile(struct metalog_worker_config *wc)
{
struct metalog_instance *ctx = wc->ctx;
struct metadata_logfile *metalogfile;
int ret;
metalogfile = ctx->metadata_logfiles.last;
if (metalogfile->records.first) { /* it has records */
/* Finalize metadata log file and create a new one */
mlf_flush_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles);
fsync_metadata_logfile(ctx->metadata_logfiles.last);
ret = add_new_metadata_logfile(ctx, &ctx->metadata_logfiles, 0, ctx->last_fileno + 1);
if (likely(!ret)) {
++ctx->last_fileno;
}
}
}
void metalog_test_quota(struct metalog_worker_config *wc)
{
struct metalog_instance *ctx = wc->ctx;
struct metadata_logfile *metalogfile;
unsigned current_size;
uint8_t only_one_metalogfile;
metalogfile = ctx->metadata_logfiles.last;
current_size = metalogfile->pos;
if (unlikely(current_size >= MAX_METALOGFILE_SIZE)) {
metalog_try_link_new_metadata_logfile(wc);
}
metalogfile = ctx->metadata_logfiles.last;
only_one_metalogfile = (metalogfile == ctx->metadata_logfiles.first) ? 1 : 0;
debug(D_METADATALOG, "records=%lu objects=%lu", (long unsigned)ctx->records_nr,
(long unsigned)ctx->objects_nr);
if (unlikely(!only_one_metalogfile &&
ctx->records_nr > (ctx->objects_nr * (uint64_t)MAX_DUPLICATION_PERCENTAGE) / 100) &&
NO_QUIESCE == ctx->quiesce) {
metalog_do_compaction(wc);
}
}
static inline int metalog_threads_alive(struct metalog_worker_config* wc)
{
if (wc->cleanup_thread_compacting_files) {
return 1;
}
return 0;
}
static void metalog_cleanup_finished_threads(struct metalog_worker_config *wc)
{
struct metalog_instance *ctx = wc->ctx;
if (unlikely(wc->cleanup_thread_compacting_files)) {
after_compact_old_records(wc);
}
if (unlikely(SET_QUIESCE == ctx->quiesce && !metalog_threads_alive(wc))) {
ctx->quiesce = QUIESCED;
complete(&ctx->metalog_completion);
}
}
static void metalog_init_cmd_queue(struct metalog_worker_config *wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
wc->queue_size = 0;
fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
void metalog_enq_cmd(struct metalog_worker_config *wc, struct metalog_cmd *cmd)
{
unsigned queue_size;
/* wait for free space in queue */
uv_mutex_lock(&wc->cmd_mutex);
while ((queue_size = wc->queue_size) == METALOG_CMD_Q_MAX_SIZE) {
uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
}
fatal_assert(queue_size < METALOG_CMD_Q_MAX_SIZE);
/* enqueue command */
wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
wc->cmd_queue.tail = wc->cmd_queue.tail != METALOG_CMD_Q_MAX_SIZE - 1 ?
wc->cmd_queue.tail + 1 : 0;
wc->queue_size = queue_size + 1;
uv_mutex_unlock(&wc->cmd_mutex);
/* wake up event loop */
fatal_assert(0 == uv_async_send(&wc->async));
}
struct metalog_cmd metalog_deq_cmd(struct metalog_worker_config *wc)
{
struct metalog_cmd ret;
unsigned queue_size;
uv_mutex_lock(&wc->cmd_mutex);
queue_size = wc->queue_size;
if (queue_size == 0) {
ret.opcode = METALOG_NOOP;
} else {
/* dequeue command */
ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
if (queue_size == 1) {
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
} else {
wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ?
wc->cmd_queue.head + 1 : 0;
}
wc->queue_size = queue_size - 1;
/* wake up producers */
uv_cond_signal(&wc->cmd_cond);
}
uv_mutex_unlock(&wc->cmd_mutex);
return ret;
}
static void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
uv_update_time(handle->loop);
debug(D_METADATALOG, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
/* Flushes metadata log when timer expires */
#define TIMER_PERIOD_MS (5000)
static void timer_cb(uv_timer_t* handle)
{
struct metalog_worker_config* wc = handle->data;
struct metalog_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
metalog_test_quota(wc);
debug(D_METADATALOG, "%s: timeout reached.", __func__);
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
debug(D_METADATALOG, "%s", get_metalog_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
mlf_flush_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles);
}
#define MAX_CMD_BATCH_SIZE (256)
void metalog_worker(void* arg)
{
struct metalog_worker_config *wc = arg;
struct metalog_instance *ctx = wc->ctx;
uv_loop_t* loop;
int shutdown, ret;
enum metalog_opcode opcode;
uv_timer_t timer_req;
struct metalog_cmd cmd;
unsigned cmd_batch_size;
sanity_check();
metalog_init_cmd_queue(wc);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
ret = uv_loop_init(loop);
if (ret) {
error("uv_loop_init(): %s", uv_strerror(ret));
goto error_after_loop_init;
}
loop->data = wc;
ret = uv_async_init(wc->loop, &wc->async, async_cb);
if (ret) {
error("uv_async_init(): %s", uv_strerror(ret));
goto error_after_async_init;
}
wc->async.data = wc;
wc->now_compacting_files = NULL;
wc->cleanup_thread_compacting_files = 0;
/* quota check timer */
ret = uv_timer_init(loop, &timer_req);
if (ret) {
error("uv_timer_init(): %s", uv_strerror(ret));
goto error_after_timer_init;
}
timer_req.data = wc;
wc->error = 0;
/* wake up initialization thread */
complete(&ctx->metalog_completion);
fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (likely(shutdown == 0 || metalog_threads_alive(wc))) {
uv_run(loop, UV_RUN_DEFAULT);
metalog_cleanup_finished_threads(wc);
/* wait for commands */
cmd_batch_size = 0;
do {
/*
* Avoid starving the loop when there are too many commands coming in.
* timer_cb will interrupt the loop again to allow serving more commands.
*/
if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
break;
cmd = metalog_deq_cmd(wc);
opcode = cmd.opcode;
++cmd_batch_size;
switch (opcode) {
case METALOG_NOOP:
/* the command queue was empty, do nothing */
break;
case METALOG_SHUTDOWN:
shutdown = 1;
break;
case METALOG_QUIESCE:
ctx->quiesce = SET_QUIESCE;
fatal_assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
mlf_flush_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles);
if (!metalog_threads_alive(wc)) {
ctx->quiesce = QUIESCED;
complete(&ctx->metalog_completion);
}
break;
case METALOG_COMMIT_CREATION_RECORD:
do_commit_record(wc, METALOG_CREATE_OBJECT, &cmd.record_io_descr);
break;
case METALOG_COMMIT_DELETION_RECORD:
do_commit_record(wc, METALOG_DELETE_OBJECT, &cmd.record_io_descr);
break;
case METALOG_COMPACTION_FLUSH:
mlf_flush_records_buffer(wc, &ctx->compaction_state.records_log,
&ctx->compaction_state.new_metadata_logfiles);
fsync_metadata_logfile(ctx->compaction_state.new_metadata_logfiles.last);
complete(cmd.record_io_descr.completion);
break;
default:
debug(D_METADATALOG, "%s: default.", __func__);
break;
}
} while (opcode != METALOG_NOOP);
}
/* cleanup operations of the event loop */
info("Shutting down RRD metadata log event loop.");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
* an issue in the future.
*/
uv_close((uv_handle_t *)&wc->async, NULL);
mlf_flush_records_buffer(wc, &ctx->records_log, &ctx->metadata_logfiles);
uv_run(loop, UV_RUN_DEFAULT);
info("Shutting down RRD metadata log loop complete.");
/* TODO: don't let the API block by waiting to enqueue commands */
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
fatal_assert(0 == uv_loop_close(loop));
freez(loop);
return;
error_after_timer_init:
uv_close((uv_handle_t *)&wc->async, NULL);
error_after_async_init:
fatal_assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
wc->error = UV_EAGAIN;
/* wake up initialization thread */
complete(&ctx->metalog_completion);
}
void error_with_guid(uuid_t *uuid, char *reason)
{
char uuid_str[37];
uuid_unparse_lower(*uuid, uuid_str);
errno = 0;
error("%s (GUID = %s)", reason, uuid_str);
}
void info_with_guid(uuid_t *uuid, char *reason)
{
char uuid_str[37];
uuid_unparse_lower(*uuid, uuid_str);
info("%s (GUID = %s)", reason, uuid_str);
}

View file

@ -16,124 +16,13 @@
struct metalog_instance;
struct parser_user_object;
#define MAX_PAGES_PER_EXTENT (64) /* TODO: can go higher only when journal supports bigger than 4KiB transactions */
#define METALOG_FILE_NUMBER_SCAN_TMPL "%5u-%5u"
#define METALOG_FILE_NUMBER_PRINT_TMPL "%5.5u-%5.5u"
#define MAX_DUPLICATION_PERCENTAGE 150 /* the maximum duplication factor of objects in metadata log records */
typedef enum {
METALOG_STATUS_UNINITIALIZED = 0,
METALOG_STATUS_INITIALIZING,
METALOG_STATUS_INITIALIZED
} metalog_state_t;
struct metalog_record_io_descr {
BUFFER *buffer;
struct completion *completion;
int compacting; /* When 0 append at the end of the metadata log file list.
When 1 append to the temporary compaction metadata log file list. */
uuid_t uuid;
};
enum metalog_opcode {
/* can be used to return empty status or flush the command queue */
METALOG_NOOP = 0,
METALOG_SHUTDOWN,
METALOG_COMMIT_CREATION_RECORD,
METALOG_COMMIT_DELETION_RECORD,
METALOG_COMPACTION_FLUSH,
METALOG_QUIESCE,
METALOG_MAX_OPCODE
};
struct metalog_cmd {
enum metalog_opcode opcode;
struct metalog_record_io_descr record_io_descr;
};
#define METALOG_CMD_Q_MAX_SIZE (2048)
struct metalog_cmdqueue {
unsigned head, tail;
struct metalog_cmd cmd_array[METALOG_CMD_Q_MAX_SIZE];
};
struct metalog_worker_config {
struct metalog_instance *ctx;
uv_thread_t thread;
uv_loop_t *loop;
uv_async_t async;
/* metadata log file comapaction thread */
uv_thread_t *now_compacting_files;
unsigned long cleanup_thread_compacting_files; /* set to 0 when now_compacting_files is still running */
/* FIFO command queue */
uv_mutex_t cmd_mutex;
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct metalog_cmdqueue cmd_queue;
int error;
};
/*
* Debug statistics not used by code logic.
* They only describe operations since DB engine instance load time.
*/
struct metalog_statistics {
rrdeng_stats_t io_write_bytes;
rrdeng_stats_t io_write_requests;
rrdeng_stats_t io_read_bytes;
rrdeng_stats_t io_read_requests;
rrdeng_stats_t io_write_record_bytes;
rrdeng_stats_t io_write_records;
rrdeng_stats_t io_read_record_bytes;
rrdeng_stats_t io_read_records;
rrdeng_stats_t metadata_logfile_creations;
rrdeng_stats_t metadata_logfile_deletions;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
};
struct metalog_instance {
struct rrdengine_instance *rrdeng_ctx;
struct metalog_worker_config worker_config;
struct completion metalog_completion;
struct metadata_record_commit_log records_log;
struct metadata_logfile_list metadata_logfiles;
struct parser_user_object *metalog_parser_object;
struct logfile_compaction_state compaction_state;
uint32_t current_compaction_id; /* Every compaction run increments this by 1 */
unsigned long disk_space;
unsigned long records_nr;
unsigned long objects_nr; /* total objects (hosts, charts, dimensions) monitored in this context */
uint8_t initialized; /* set to 1 to mark context initialized */
unsigned last_fileno; /* newest index of metadata log file */
uint8_t quiesce; /*
* 0 initial state when all operations function normally
* 1 set it before shutting down the instance, quiesce long running operations
* 2 is set after all threads have finished running
*/
struct metalog_statistics stats;
};
extern void metalog_commit_record(struct metalog_instance *ctx, BUFFER *buffer, enum metalog_opcode opcode,
uuid_t *uuid, int compacting);
extern int init_metadata_logfiles(struct metalog_instance *ctx);
extern void finalize_metadata_logfiles(struct metalog_instance *ctx);
extern void metalog_try_link_new_metadata_logfile(struct metalog_worker_config *wc);
extern void metalog_test_quota(struct metalog_worker_config *wc);
extern void metalog_worker(void* arg);
extern void metalog_enq_cmd(struct metalog_worker_config *wc, struct metalog_cmd *cmd);
extern struct metalog_cmd metalog_deq_cmd(struct metalog_worker_config *wc);
extern void error_with_guid(uuid_t *uuid, char *reason);
extern void info_with_guid(uuid_t *uuid, char *reason);
#endif /* NETDATA_METADATALOG_H */

View file

@ -3,461 +3,6 @@
#include "metadatalog.h"
static inline struct metalog_instance *get_metalog_ctx(RRDHOST *host)
{
if (host->rrdeng_ctx)
return host->rrdeng_ctx->metalog_ctx;
return NULL;
}
static inline int metalog_is_initialized(struct metalog_instance *ctx)
{
return ctx->rrdeng_ctx->metalog_ctx != NULL;
}
static inline void metalog_commit_creation_record(struct metalog_instance *ctx, BUFFER *buffer, uuid_t *uuid)
{
metalog_commit_record(ctx, buffer, METALOG_COMMIT_CREATION_RECORD, uuid, 0);
}
static inline void metalog_commit_deletion_record(struct metalog_instance *ctx, BUFFER *buffer)
{
metalog_commit_record(ctx, buffer, METALOG_COMMIT_DELETION_RECORD, NULL, 0);
}
void metalog_upd_objcount(RRDHOST *host, int count)
{
struct metalog_instance *ctx = get_metalog_ctx(host);
if (unlikely(!ctx))
return;
rrd_atomic_fetch_add(&ctx->objects_nr, count);
}
BUFFER *metalog_update_host_buffer(RRDHOST *host)
{
BUFFER *buffer;
buffer = buffer_create(4096); /* This will be freed after it has been committed to the metadata log buffer */
rrdhost_rdlock(host);
buffer_sprintf(buffer,
"HOST \"%s\" \"%s\" \"%s\" %d \"%s\" \"%s\" \"%s\"\n",
// "\"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\"" /* system */
// "\"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\"", /* info */
host->machine_guid,
host->hostname,
host->registry_hostname,
default_rrd_update_every,
host->os,
host->timezone,
(host->tags) ? host->tags : "");
netdata_rwlock_rdlock(&host->labels_rwlock);
struct label *labels = host->labels;
while (labels) {
buffer_sprintf(buffer
, "LABEL \"%s\" = %d %s\n"
, labels->key
, (int)labels->label_source
, labels->value);
labels = labels->next;
}
netdata_rwlock_unlock(&host->labels_rwlock);
buffer_strcat(buffer, "OVERWRITE labels\n");
rrdhost_unlock(host);
return buffer;
}
void metalog_commit_update_host(RRDHOST *host)
{
struct metalog_instance *ctx;
BUFFER *buffer;
/* Metadata are only available with dbengine */
ctx = get_metalog_ctx(host);
if (!ctx)
return;
if (!ctx->initialized) /* metadata log has not been initialized yet */
return;
buffer = metalog_update_host_buffer(host);
metalog_commit_creation_record(ctx, buffer, &host->host_uuid);
}
/* compaction_id 0 means it was not called by compaction logic */
BUFFER *metalog_update_chart_buffer(RRDSET *st, uint32_t compaction_id)
{
BUFFER *buffer;
RRDHOST *host = st->rrdhost;
buffer = buffer_create(1024); /* This will be freed after it has been committed to the metadata log buffer */
rrdset_rdlock(st);
buffer_sprintf(buffer, "CONTEXT %s\n", host->machine_guid);
char uuid_str[37];
uuid_unparse_lower(*st->chart_uuid, uuid_str);
buffer_sprintf(buffer, "GUID %s\n", uuid_str);
// properly set the name for the remote end to parse it
char *name = "";
if(likely(st->name)) {
if(unlikely(strcmp(st->id, st->name))) {
// they differ
name = strchr(st->name, '.');
if(name)
name++;
else
name = "";
}
}
// send the chart
buffer_sprintf(
buffer
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, st->id
, name
, st->title
, st->units
, st->family
, st->context
, rrdset_type_name(st->chart_type)
, st->priority
, st->update_every
, "" /* archived charts cannot be obsolete */
, rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
, rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
, rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
, (st->plugin_name)?st->plugin_name:""
, (st->module_name)?st->module_name:""
);
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
char uuid_str[37];
uuid_unparse_lower(*rd->state->metric_uuid, uuid_str);
buffer_sprintf(buffer, "GUID %s\n", uuid_str);
buffer_sprintf(
buffer
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
, rd->id
, rd->name
, rrd_algorithm_name(rd->algorithm)
, rd->multiplier
, rd->divisor
, "" /* archived dimensions cannot be obsolete */
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
if (compaction_id && compaction_id > rd->state->compaction_id) {
/* No need to use this dimension again during this compaction cycle */
rd->state->compaction_id = compaction_id;
}
}
rrdset_unlock(st);
return buffer;
}
void metalog_commit_update_chart(RRDSET *st)
{
struct metalog_instance *ctx;
BUFFER *buffer;
/* Metadata are only available with dbengine */
if (RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)
return;
ctx = get_metalog_ctx(st->rrdhost);
if (!ctx)
return;
if (!ctx->initialized) /* metadata log has not been initialized yet */
return;
buffer = metalog_update_chart_buffer(st, 0);
metalog_commit_creation_record(ctx, buffer, st->chart_uuid);
}
void metalog_commit_delete_chart(RRDSET *st)
{
struct metalog_instance *ctx;
BUFFER *buffer;
char uuid_str[37];
/* Metadata are only available with dbengine */
if (RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)
return;
ctx = get_metalog_ctx(st->rrdhost);
if (!ctx)
return;
if (!ctx->initialized) /* metadata log has not been initialized yet */
return;
buffer = buffer_create(64); /* This will be freed after it has been committed to the metadata log buffer */
uuid_unparse_lower(*st->chart_uuid, uuid_str);
buffer_sprintf(buffer, "TOMBSTONE %s\n", uuid_str);
metalog_commit_deletion_record(ctx, buffer);
}
BUFFER *metalog_update_dimension_buffer(RRDDIM *rd)
{
BUFFER *buffer;
RRDSET *st = rd->rrdset;
char uuid_str[37];
buffer = buffer_create(128); /* This will be freed after it has been committed to the metadata log buffer */
uuid_unparse_lower(*st->chart_uuid, uuid_str);
buffer_sprintf(buffer, "CONTEXT %s\n", uuid_str);
// Activate random GUID
uuid_unparse_lower(*rd->state->metric_uuid, uuid_str);
buffer_sprintf(buffer, "GUID %s\n", uuid_str);
buffer_sprintf(
buffer
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
, rd->id
, rd->name
, rrd_algorithm_name(rd->algorithm)
, rd->multiplier
, rd->divisor
, "" /* archived dimensions cannot be obsolete */
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
return buffer;
}
void metalog_commit_update_dimension(RRDDIM *rd)
{
struct metalog_instance *ctx;
BUFFER *buffer;
RRDSET *st = rd->rrdset;
/* Metadata are only available with dbengine */
if (RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)
return;
ctx = get_metalog_ctx(st->rrdhost);
if (!ctx)
return;
if (!ctx->initialized) /* metadata log has not been initialized yet */
return;
buffer = metalog_update_dimension_buffer(rd);
metalog_commit_creation_record(ctx, buffer, rd->state->metric_uuid);
}
void metalog_commit_delete_dimension(RRDDIM *rd)
{
struct metalog_instance *ctx;
BUFFER *buffer;
RRDSET *st = rd->rrdset;
char uuid_str[37];
/* Metadata are only available with dbengine */
if (RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)
return;
ctx = get_metalog_ctx(st->rrdhost);
if (!ctx)
return;
if (!ctx->initialized) /* metadata log has not been initialized yet */
return;
buffer = buffer_create(64); /* This will be freed after it has been committed to the metadata log buffer */
uuid_unparse_lower(*rd->state->metric_uuid, uuid_str);
buffer_sprintf(buffer, "TOMBSTONE %s\n", uuid_str);
metalog_commit_deletion_record(ctx, buffer);
}
RRDHOST *metalog_get_host_from_uuid(struct metalog_instance *ctx, uuid_t *host_guid)
{
UNUSED(ctx);
GUID_TYPE ret;
char machine_guid[37];
uuid_unparse_lower(*host_guid, machine_guid);
RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
ret = find_object_by_guid(host_guid, NULL, 0);
if (unlikely(GUID_TYPE_HOST != ret)) {
errno = 0;
if (unlikely(!host))
error("Host with GUID %s not found in the global map or in the list of hosts", machine_guid);
else
error("Host with GUID %s not found in the global map", machine_guid);
}
return host;
}
RRDSET *metalog_get_chart_from_uuid(struct metalog_instance *ctx, uuid_t *chart_uuid)
{
GUID_TYPE ret;
char chart_object[33], chart_fullid[RRD_ID_LENGTH_MAX + 1];
uuid_t *machine_guid, *chart_char_guid;
ret = find_object_by_guid(chart_uuid, chart_object, 33);
if (unlikely(GUID_TYPE_CHART != ret))
return NULL;
machine_guid = (uuid_t *)chart_object;
RRDHOST *host = metalog_get_host_from_uuid(ctx, machine_guid);
if (unlikely(!host))
return NULL;
if (unlikely(uuid_compare(host->host_uuid, *machine_guid))) {
errno = 0;
error("Metadata host machine GUID does not match the one assosiated with the chart");
return NULL;
}
chart_char_guid = (uuid_t *)(chart_object + 16);
ret = find_object_by_guid(chart_char_guid, chart_fullid, RRD_ID_LENGTH_MAX + 1);
if (unlikely(GUID_TYPE_CHAR != ret))
return NULL;
RRDSET *st = rrdset_find(host, chart_fullid);
return st;
}
RRDDIM *metalog_get_dimension_from_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
{
UNUSED(ctx);
GUID_TYPE ret;
char dim_object[49], chart_object[33], id_str[PLUGINSD_LINE_MAX], chart_fullid[RRD_ID_LENGTH_MAX + 1];
uuid_t *machine_guid, *chart_guid, *chart_char_guid, *dim_char_guid;
ret = find_object_by_guid(metric_uuid, dim_object, sizeof(dim_object));
if (GUID_TYPE_DIMENSION != ret) /* not found */
return NULL;
machine_guid = (uuid_t *)dim_object;
RRDHOST *host = metalog_get_host_from_uuid(ctx, machine_guid);
if (unlikely(!host))
return NULL;
if (unlikely(uuid_compare(host->host_uuid, *machine_guid))) {
errno = 0;
error("Metadata host machine GUID does not match the one assosiated with the dimension");
return NULL;
}
chart_guid = (uuid_t *)(dim_object + 16);
dim_char_guid = (uuid_t *)(dim_object + 16 + 16);
ret = find_object_by_guid(dim_char_guid, id_str, sizeof(id_str));
if (unlikely(GUID_TYPE_CHAR != ret))
return NULL;
ret = find_object_by_guid(chart_guid, chart_object, sizeof(chart_object));
if (unlikely(GUID_TYPE_CHART != ret))
return NULL;
chart_char_guid = (uuid_t *)(chart_object + 16);
ret = find_object_by_guid(chart_char_guid, chart_fullid, RRD_ID_LENGTH_MAX + 1);
if (unlikely(GUID_TYPE_CHAR != ret))
return NULL;
RRDSET *st = rrdset_find(host, chart_fullid);
if (!st)
return NULL;
RRDDIM *rd = rrddim_find(st, id_str);
return rd;
}
/* This function is called by dbengine rotation logic when the metric has no writers */
void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
{
RRDDIM *rd;
RRDSET *st;
RRDHOST *host;
uint8_t empty_chart;
rd = metalog_get_dimension_from_uuid(ctx, metric_uuid);
if (!rd) { /* in the case of legacy UUID convert to multihost and try again */
uuid_t multihost_uuid;
rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
rd = metalog_get_dimension_from_uuid(ctx, &multihost_uuid);
}
if(!rd) {
info("Rotated unknown archived metric.");
return;
}
st = rd->rrdset;
host = st->rrdhost;
/* In case there are active metrics in a different database engine do not delete the dimension object */
if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE))
return;
/* Since the metric has no writer it will not be commited to the metadata log by rrddim_free_custom().
* It must be commited explicitly before calling rrddim_free_custom(). */
metalog_commit_delete_dimension(rd);
rrdset_wrlock(st);
rrddim_free_custom(st, rd, 1);
empty_chart = (NULL == st->dimensions);
rrdset_unlock(st);
if (empty_chart) {
rrdhost_wrlock(host);
rrdset_rdlock(st);
rrdset_delete_custom(st, 1);
rrdset_unlock(st);
rrdset_free(st);
rrdhost_unlock(host);
}
}
void metalog_print_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
{
RRDDIM *rd;
RRDSET *st;
RRDHOST *host;
if (!ctx || !ctx->initialized)
return;
rd = metalog_get_dimension_from_uuid(ctx, metric_uuid);
if (!rd) { /* in the case of legacy UUID convert to multihost and try again */
uuid_t multihost_uuid;
rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
rd = metalog_get_dimension_from_uuid(ctx, &multihost_uuid);
}
if(!rd) {
error_with_guid(metric_uuid, "GUID not found, unknown metric.");
return;
}
st = rd->rrdset;
host = st->rrdhost;
error_with_guid(metric_uuid, "Host - Chart - Dimension are the below:");
error("%s %s %s.", host->hostname, st->id, rd->id);
if (unlikely(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE))
error_with_guid(metric_uuid, "UUID does not belong to RRD_MEMORY_MODE_DBENGINE.");
}
/*
* Returns 0 on success, negative on error
*/
@ -467,76 +12,28 @@ int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx)
int error;
ctx = callocz(1, sizeof(*ctx));
ctx->records_nr = 0;
ctx->objects_nr = 0;
ctx->current_compaction_id = 0;
ctx->quiesce = NO_QUIESCE;
ctx->initialized = 0;
rrdeng_parent_ctx->metalog_ctx = ctx;
memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
ctx->rrdeng_ctx = rrdeng_parent_ctx;
ctx->worker_config.ctx = ctx;
init_metadata_record_log(&ctx->records_log);
error = init_metalog_files(ctx);
if (error) {
goto error_after_init_rrd_files;
}
init_completion(&ctx->metalog_completion);
fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, metalog_worker, &ctx->worker_config));
/* wait for worker thread to initialize */
wait_for_completion(&ctx->metalog_completion);
destroy_completion(&ctx->metalog_completion);
uv_thread_set_name_np(ctx->worker_config.thread, "METALOG");
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}
ctx->initialized = 1; /* notify dbengine that the metadata log has finished initializing */
return 0;
error_after_rrdeng_worker:
finalize_metalog_files(ctx);
error_after_init_rrd_files:
freez(ctx);
return UV_EIO;
}
/*
* Returns 0 on success, 1 on error
*/
int metalog_exit(struct metalog_instance *ctx)
/* This function is called by dbengine rotation logic when the metric has no writers */
void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid)
{
struct metalog_cmd cmd;
uuid_t multihost_uuid;
if (NULL == ctx) {
return 1;
}
cmd.opcode = METALOG_SHUTDOWN;
metalog_enq_cmd(&ctx->worker_config, &cmd);
fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
finalize_metalog_files(ctx);
freez(ctx);
return 0;
}
void metalog_prepare_exit(struct metalog_instance *ctx)
{
struct metalog_cmd cmd;
if (NULL == ctx) {
return;
}
init_completion(&ctx->metalog_completion);
cmd.opcode = METALOG_QUIESCE;
metalog_enq_cmd(&ctx->worker_config, &cmd);
/* wait for metadata log to quiesce */
wait_for_completion(&ctx->metalog_completion);
destroy_completion(&ctx->metalog_completion);
delete_dimension_uuid(metric_uuid);
rrdeng_convert_legacy_uuid_to_multihost(ctx->rrdeng_ctx->machine_guid, metric_uuid, &multihost_uuid);
delete_dimension_uuid(&multihost_uuid);
}

View file

@ -3,27 +3,10 @@
#ifndef NETDATA_METADATALOGAPI_H
#define NETDATA_METADATALOGAPI_H
#include "metadatalog.h"
extern BUFFER *metalog_update_host_buffer(RRDHOST *host);
extern void metalog_commit_update_host(RRDHOST *host);
extern BUFFER *metalog_update_chart_buffer(RRDSET *st, uint32_t compaction_id);
extern void metalog_commit_update_chart(RRDSET *st);
extern void metalog_commit_delete_chart(RRDSET *st);
extern BUFFER *metalog_update_dimension_buffer(RRDDIM *rd);
extern void metalog_commit_update_dimension(RRDDIM *rd);
extern void metalog_commit_delete_dimension(RRDDIM *rd);
extern void metalog_upd_objcount(RRDHOST *host, int count);
extern RRDSET *metalog_get_chart_from_uuid(struct metalog_instance *ctx, uuid_t *chart_uuid);
extern RRDDIM *metalog_get_dimension_from_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
extern RRDHOST *metalog_get_host_from_uuid(struct metalog_instance *ctx, uuid_t *uuid);
extern void metalog_delete_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
extern void metalog_print_dimension_by_uuid(struct metalog_instance *ctx, uuid_t *metric_uuid);
/* must call once before using anything */
extern int metalog_init(struct rrdengine_instance *rrdeng_parent_ctx);
extern int metalog_exit(struct metalog_instance *ctx);
extern void metalog_prepare_exit(struct metalog_instance *ctx);
#endif /* NETDATA_METADATALOGAPI_H */

View file

@ -10,13 +10,6 @@ PARSER_RC metalog_pluginsd_host_action(
void *user, char *machine_guid, char *hostname, char *registry_hostname, int update_every, char *os, char *timezone,
char *tags)
{
int history = 5;
RRD_MEMORY_MODE mode = RRD_MEMORY_MODE_DBENGINE;
int rrdpush_enabled = default_rrdpush_enabled;
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
@ -26,72 +19,28 @@ PARSER_RC metalog_pluginsd_host_action(
host->hostname, rrd_memory_mode_name(host->rrd_memory_mode),
rrd_memory_mode_name(RRD_MEMORY_MODE_DBENGINE));
((PARSER_USER_OBJECT *) user)->host = NULL; /* Ignore objects if memory mode is not dbengine */
return PARSER_RC_OK;
}
goto write_replay;
}
if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
struct metalog_record record;
struct metadata_logfile *metalogfile = state->metalogfile;
uuid_parse(machine_guid, record.uuid);
mlf_record_insert(metalogfile, &record);
if (localhost->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
((PARSER_USER_OBJECT *) user)->host = localhost;
else
((PARSER_USER_OBJECT *) user)->host = NULL;
((PARSER_USER_OBJECT *) user)->host = host;
return PARSER_RC_OK;
}
// Fetch configuration options from streaming config
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
//rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
//rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
//rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
//rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
host = rrdhost_create(
hostname
, registry_hostname
, machine_guid
, os
, timezone
, tags
, NULL
, NULL
, update_every
, history // entries
, mode
, 0 // health enabled
, rrdpush_enabled // Push enabled
, rrdpush_destination //destination
, rrdpush_api_key // api key
, rrdpush_send_charts_matching // charts matching
, callocz(1, sizeof(struct rrdhost_system_info))
, 0 // localhost
, 1 // archived
);
write_replay:
if (host) { /* It's a valid object */
struct metalog_record record;
struct metadata_logfile *metalogfile = state->metalogfile;
uuid_copy(record.uuid, host->host_uuid);
mlf_record_insert(metalogfile, &record);
if (strcmp(machine_guid, registry_get_this_machine_guid()) == 0) {
((PARSER_USER_OBJECT *) user)->host = host;
return PARSER_RC_OK;
}
((PARSER_USER_OBJECT *) user)->host = host;
if (likely(!uuid_parse(machine_guid, state->host_uuid))) {
int rc = sql_store_host(&state->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags);
if (unlikely(rc)) {
errno = 0;
error("Failed to store host %s with UUID %s in the database", hostname, machine_guid);
}
}
else {
errno = 0;
error("Host machine GUID %s is not valid", machine_guid);
}
return PARSER_RC_OK;
}
@ -99,51 +48,23 @@ PARSER_RC metalog_pluginsd_chart_action(void *user, char *type, char *id, char *
char *title, char *units, char *plugin, char *module, int priority,
int update_every, RRDSET_TYPE chart_type, char *options)
{
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
RRDSET *st = NULL;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
uuid_t *chart_uuid;
UNUSED(options);
if (unlikely(!host)) {
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(uuid_is_null(state->host_uuid))) {
debug(D_METADATALOG, "Ignoring chart belonging to missing or ignored host.");
return PARSER_RC_OK;
}
chart_uuid = uuid_is_null(state->uuid) ? NULL : &state->uuid;
st = rrdset_create_custom(
host, type, id, name, family, context, title, units,
uuid_copy(state->chart_uuid, state->uuid);
uuid_clear(state->uuid); /* Consume UUID */
(void) sql_store_chart(&state->chart_uuid, &state->host_uuid,
type, id, name, family, context, title, units,
plugin, module, priority, update_every,
chart_type, RRD_MEMORY_MODE_DBENGINE, (host)->rrd_history_entries, 1, chart_uuid);
chart_type, RRD_MEMORY_MODE_DBENGINE, host ? host->rrd_history_entries : 1);
((PARSER_USER_OBJECT *)user)->st_exists = 1;
rrdset_isnot_obsolete(st); /* archived charts cannot be obsolete */
if (options && *options) {
if (strstr(options, "detail"))
rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
else
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
if (strstr(options, "hidden"))
rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
else
rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
if (strstr(options, "store_first"))
rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
else
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
} else {
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
((PARSER_USER_OBJECT *)user)->st = st;
if (chart_uuid) { /* It's a valid object */
struct metalog_record record;
struct metadata_logfile *metalogfile = state->metalogfile;
uuid_copy(record.uuid, state->uuid);
mlf_record_insert(metalogfile, &record);
uuid_clear(state->uuid); /* Consume UUID */
}
return PARSER_RC_OK;
}
@ -152,36 +73,25 @@ PARSER_RC metalog_pluginsd_dimension_action(void *user, RRDSET *st, char *id, ch
{
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
UNUSED(user);
UNUSED(options);
UNUSED(algorithm);
uuid_t *dim_uuid;
UNUSED(st);
if (unlikely(!st)) {
if (unlikely(uuid_is_null(state->chart_uuid))) {
debug(D_METADATALOG, "Ignoring dimension belonging to missing or ignored chart.");
info("Ignoring dimension belonging to missing or ignored chart.");
return PARSER_RC_OK;
}
dim_uuid = uuid_is_null(state->uuid) ? NULL : &state->uuid;
RRDDIM *rd = rrddim_add_custom(st, id, name, multiplier, divisor, algorithm_type, RRD_MEMORY_MODE_DBENGINE, 1,
dim_uuid);
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
rrddim_isnot_obsolete(st, rd); /* archived dimensions cannot be obsolete */
if (options && *options) {
if (strstr(options, "hidden") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
if (strstr(options, "noreset") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (strstr(options, "nooverflow") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (unlikely(uuid_is_null(state->uuid))) {
debug(D_METADATALOG, "Ignoring dimension without unknown UUID");
info("Ignoring dimension without unknown UUID");
return PARSER_RC_OK;
}
if (dim_uuid) { /* It's a valid object */
struct metalog_record record;
struct metadata_logfile *metalogfile = state->metalogfile;
uuid_copy(record.uuid, state->uuid);
mlf_record_insert(metalogfile, &record);
uuid_clear(state->uuid); /* Consume UUID */
}
(void) sql_store_dimension(&state->uuid, &state->chart_uuid, id, name, multiplier, divisor, algorithm_type);
uuid_clear(state->uuid); /* Consume UUID */
return PARSER_RC_OK;
}
@ -196,113 +106,27 @@ PARSER_RC metalog_pluginsd_guid_action(void *user, uuid_t *uuid)
PARSER_RC metalog_pluginsd_context_action(void *user, uuid_t *uuid)
{
GUID_TYPE ret;
//struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
//struct metalog_instance *ctx = state->ctx;
char object[49], chart_object[33], id_str[1024];
uuid_t *chart_guid, *chart_char_guid;
RRDHOST *host;
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
ret = find_object_by_guid(uuid, object, 49);
switch (ret) {
case GUID_TYPE_NOTFOUND:
error_with_guid(uuid, "Failed to find valid context");
break;
case GUID_TYPE_CHAR:
error_with_guid(uuid, "Ignoring unexpected type GUID_TYPE_CHAR");
break;
case GUID_TYPE_CHART:
case GUID_TYPE_DIMENSION:
host = metalog_get_host_from_uuid(NULL, (uuid_t *) &object);
if (unlikely(!host))
break;
switch (ret) {
case GUID_TYPE_CHART:
chart_char_guid = (uuid_t *)(object + 16);
int rc = find_uuid_type(uuid);
ret = find_object_by_guid(chart_char_guid, id_str, RRD_ID_LENGTH_MAX + 1);
if (unlikely(GUID_TYPE_CHAR != ret))
error_with_guid(uuid, "Failed to find valid chart name");
else
((PARSER_USER_OBJECT *)user)->st = rrdset_find(host, id_str);
break;
case GUID_TYPE_DIMENSION:
chart_guid = (uuid_t *)(object + 16);
ret = find_object_by_guid(chart_guid, chart_object, 33);
if (unlikely(GUID_TYPE_CHART != ret)) {
error_with_guid(uuid, "Failed to find valid chart");
break;
}
chart_char_guid = (uuid_t *)(object + 16);
ret = find_object_by_guid(chart_char_guid, id_str, RRD_ID_LENGTH_MAX + 1);
if (unlikely(GUID_TYPE_CHAR != ret))
error_with_guid(uuid, "Failed to find valid chart name");
else
((PARSER_USER_OBJECT *)user)->st = rrdset_find(host, id_str);
break;
default:
break;
}
break;
case GUID_TYPE_HOST:
((PARSER_USER_OBJECT *)user)->host = metalog_get_host_from_uuid(NULL, (uuid_t *) &object);
break;
case GUID_TYPE_NOSPACE:
error_with_guid(uuid, "Not enough space for object retrieval");
break;
default:
error("Unknown return code %u from find_object_by_guid", ret);
break;
}
if (rc == 1) {
uuid_copy(state->host_uuid, *uuid);
((PARSER_USER_OBJECT *)user)->st_exists = 0;
((PARSER_USER_OBJECT *)user)->host_exists = 1;
} else if (rc == 2) {
uuid_copy(state->chart_uuid, *uuid);
((PARSER_USER_OBJECT *)user)->st_exists = 1;
} else
uuid_copy(state->uuid, *uuid);
return PARSER_RC_OK;
}
PARSER_RC metalog_pluginsd_tombstone_action(void *user, uuid_t *uuid)
{
GUID_TYPE ret;
struct metalog_pluginsd_state *state = ((PARSER_USER_OBJECT *)user)->private;
struct metalog_instance *ctx = state->ctx;
RRDHOST *host = NULL;
RRDSET *st;
RRDDIM *rd;
ret = find_object_by_guid(uuid, NULL, 0);
switch (ret) {
case GUID_TYPE_CHAR:
fatal_assert(0);
break;
case GUID_TYPE_CHART:
st = metalog_get_chart_from_uuid(ctx, uuid);
if (st) {
host = st->rrdhost;
rrdhost_wrlock(host);
rrdset_free(st);
rrdhost_unlock(host);
} else {
debug(D_METADATALOG, "Ignoring nonexistent chart metadata record.");
}
break;
case GUID_TYPE_DIMENSION:
rd = metalog_get_dimension_from_uuid(ctx, uuid);
if (rd) {
st = rd->rrdset;
rrdset_wrlock(st);
rrddim_free_custom(st, rd, 0);
rrdset_unlock(st);
}
else {
debug(D_METADATALOG, "Ignoring nonexistent dimension metadata record.");
}
break;
case GUID_TYPE_HOST:
/* Ignore for now */
break;
default:
break;
}
UNUSED(user);
UNUSED(uuid);
return PARSER_RC_OK;
}
@ -313,4 +137,4 @@ void metalog_pluginsd_state_init(struct metalog_pluginsd_state *state, struct me
state->skip_record = 0;
uuid_clear(state->uuid);
state->metalogfile = NULL;
}
}

View file

@ -10,6 +10,8 @@
struct metalog_pluginsd_state {
struct metalog_instance *ctx;
uuid_t uuid;
uuid_t host_uuid;
uuid_t chart_uuid;
uint8_t skip_record; /* skip this record due to errors in parsing */
struct metadata_logfile *metalogfile; /* current metadata log file being replayed */
};

View file

@ -54,9 +54,10 @@ void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
struct page_cache *pg_cache;
struct rrdengine_instance *ctx;
uuid_t legacy_uuid;
uuid_t multihost_legacy_uuid;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index = NULL;
int replace_instead_of_generate = 0, is_multihost_child = 0;
int is_multihost_child = 0;
RRDHOST *host = rd->rrdset->rrdhost;
ctx = get_rrdeng_ctx_from_host(rd->rrdset->rrdhost);
@ -67,7 +68,7 @@ void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
pg_cache = &ctx->pg_cache;
rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
rd->state->metric_uuid = callocz(1, sizeof(uuid_t));
rd->state->metric_uuid = dim_uuid;
if (host != localhost && host->rrdeng_ctx == &multidb_ctx)
is_multihost_child = 1;
@ -81,22 +82,8 @@ void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
/* First time we see the legacy UUID or metric belongs to child host in multi-host DB.
* Drop legacy support, normal path */
if (NULL != dim_uuid) {
replace_instead_of_generate = 1;
uuid_copy(*rd->state->metric_uuid, *dim_uuid);
}
if (unlikely(find_or_generate_guid(rd, rd->state->metric_uuid, GUID_TYPE_DIMENSION,
replace_instead_of_generate))) {
errno = 0;
error("FAILED to reuse GUID for %s", rd->id);
if (unlikely(find_or_generate_guid(rd, rd->state->metric_uuid, GUID_TYPE_DIMENSION, 0))) {
errno = 0;
error("FAILED to generate GUID for %s", rd->id);
freez(rd->state->metric_uuid);
rd->state->metric_uuid = NULL;
fatal_assert(0);
}
}
if (unlikely(!rd->state->metric_uuid))
rd->state->metric_uuid = create_dimension_uuid(rd->rrdset, rd);
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, rd->state->metric_uuid, sizeof(uuid_t));
@ -116,19 +103,20 @@ void rrdeng_metric_init(RRDDIM *rd, uuid_t *dim_uuid)
} else {
/* There are legacy UUIDs in the database, implement backward compatibility */
rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid,
rd->state->metric_uuid);
if (dim_uuid && uuid_compare(*rd->state->metric_uuid, *dim_uuid)) {
error("Mismatch of metadata log DIMENSION GUID with dbengine metric GUID.");
}
if (unlikely(find_or_generate_guid(rd, rd->state->metric_uuid, GUID_TYPE_DIMENSION, 1))) {
errno = 0;
error("FAILED to generate GUID for %s", rd->id);
freez(rd->state->metric_uuid);
rd->state->metric_uuid = NULL;
fatal_assert(0);
}
&multihost_legacy_uuid);
if (unlikely(!rd->state->metric_uuid))
rd->state->metric_uuid = mallocz(sizeof(uuid_t));
int need_to_store = (dim_uuid == NULL || uuid_compare(*rd->state->metric_uuid, multihost_legacy_uuid));
uuid_copy(*rd->state->metric_uuid, multihost_legacy_uuid);
if (unlikely(need_to_store))
(void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
rd->algorithm);
}
rd->state->rrdeng_uuid = &page_index->id;
rd->state->page_index = page_index;
@ -981,7 +969,7 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
finalize_rrd_files(ctx);
metalog_exit(ctx->metalog_ctx);
//metalog_exit(ctx->metalog_ctx);
free_page_cache(ctx);
if (ctx != &multidb_ctx) {
@ -1007,6 +995,6 @@ void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
metalog_prepare_exit(ctx->metalog_ctx);
//metalog_prepare_exit(ctx->metalog_ctx);
}

View file

@ -950,12 +950,10 @@ extern RRDSET *rrdset_create_custom(RRDHOST *host
, int update_every
, RRDSET_TYPE chart_type
, RRD_MEMORY_MODE memory_mode
, long history_entries
, int is_archived
, uuid_t *chart_uuid);
, long history_entries);
#define rrdset_create(host, type, id, name, family, context, title, units, plugin, module, priority, update_every, chart_type) \
rrdset_create_custom(host, type, id, name, family, context, title, units, plugin, module, priority, update_every, chart_type, (host)->rrd_memory_mode, (host)->rrd_history_entries, 0, NULL)
rrdset_create_custom(host, type, id, name, family, context, title, units, plugin, module, priority, update_every, chart_type, (host)->rrd_memory_mode, (host)->rrd_history_entries)
#define rrdset_create_localhost(type, id, name, family, context, title, units, plugin, module, priority, update_every, chart_type) \
rrdset_create(localhost, type, id, name, family, context, title, units, plugin, module, priority, update_every, chart_type)
@ -1161,10 +1159,10 @@ static inline time_t rrdset_slot2time(RRDSET *st, size_t slot) {
extern void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host);
extern RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier,
collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode,
int is_archived, uuid_t *dim_uuid);
collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode);//,
//int is_archived, uuid_t *dim_uuid);
#define rrddim_add(st, id, name, multiplier, divisor, algorithm) rrddim_add_custom(st, id, name, multiplier, divisor, \
algorithm, (st)->rrd_memory_mode, 0, NULL)
algorithm, (st)->rrd_memory_mode)//, 0, NULL)
extern int rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name);
extern int rrddim_set_algorithm(RRDSET *st, RRDDIM *rd, RRD_ALGORITHM algorithm);
@ -1237,15 +1235,21 @@ extern RRDHOST *rrdhost_create(
const char *tags, const char *program_name, const char *program_version, int update_every, long entries,
RRD_MEMORY_MODE memory_mode, unsigned int health_enabled, unsigned int rrdpush_enabled, char *rrdpush_destination,
char *rrdpush_api_key, char *rrdpush_send_charts_matching, struct rrdhost_system_info *system_info,
int is_localhost, int is_archived);
int is_localhost); //TODO: Remove , int is_archived);
#endif /* NETDATA_RRD_INTERNALS */
extern void set_host_properties(
RRDHOST *host, int update_every, RRD_MEMORY_MODE memory_mode, const char *hostname, const char *registry_hostname,
const char *guid, const char *os, const char *tags, const char *tzone, const char *program_name,
const char *program_version);
// ----------------------------------------------------------------------------
// RRD DB engine declarations
#ifdef ENABLE_DBENGINE
#include "database/engine/rrdengineapi.h"
#include "sqlite/sqlite_functions.h"
#endif
#endif /* NETDATA_RRD_H */

View file

@ -216,8 +216,8 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) {
}
RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier,
collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode,
int is_archived, uuid_t *dim_uuid) {
collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode)
{
RRDHOST *host = st->rrdhost;
rrdset_wrlock(st);
@ -232,7 +232,10 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rc += rrddim_set_algorithm(st, rd, algorithm);
rc += rrddim_set_multiplier(st, rd, multiplier);
rc += rrddim_set_divisor(st, rd, divisor);
if (!is_archived && rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
#ifdef ENABLE_DBENGINE
store_active_dimension(rd->state->metric_uuid);
#endif
rd->state->collect_ops.init(rd);
rrddim_flag_clear(rd, RRDDIM_FLAG_ARCHIVED);
rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, RRDVAR_OPTION_DEFAULT);
@ -242,9 +245,10 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
}
// DBENGINE available and activated?
#ifdef ENABLE_DBENGINE
if (likely(!is_archived && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && unlikely(rc)) {
if (likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) && unlikely(rc)) {
debug(D_METADATALOG, "DIMENSION [%s] metadata updated", rd->id);
metalog_commit_update_dimension(rd);
(void)sql_store_dimension(rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
rd->algorithm);
}
#endif
rrdset_unlock(st);
@ -391,7 +395,9 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->state = mallocz(sizeof(*rd->state));
if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
#ifdef ENABLE_DBENGINE
uuid_t *dim_uuid = find_dimension_uuid(st, rd);
rrdeng_metric_init(rd, dim_uuid);
store_active_dimension(rd->state->metric_uuid);
rd->state->collect_ops.init = rrdeng_store_metric_init;
rd->state->collect_ops.store_metric = rrdeng_store_metric_next;
rd->state->collect_ops.finalize = rrdeng_store_metric_finalize;
@ -413,10 +419,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->state->query_ops.latest_time = rrddim_query_latest_time;
rd->state->query_ops.oldest_time = rrddim_query_oldest_time;
}
if (is_archived)
rrddim_flag_set(rd, RRDDIM_FLAG_ARCHIVED);
else
rd->state->collect_ops.init(rd); // only initialize if a collector created this dimension
rd->state->collect_ops.init(rd);
// append this dimension
if(!st->dimensions)
st->dimensions = rd;
@ -443,7 +446,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
td->next = rd;
}
if(host->health_enabled && !is_archived) {
if(host->health_enabled) {
rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, RRDVAR_OPTION_DEFAULT);
rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, RRDVAR_OPTION_DEFAULT);
@ -452,19 +455,13 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
if(unlikely(rrddim_index_add(st, rd) != rd))
error("RRDDIM: INTERNAL ERROR: attempt to index duplicate dimension '%s' on chart '%s'", rd->id, st->id);
if (!is_archived)
calc_link_to_rrddim(rd);
calc_link_to_rrddim(rd);
rrdset_unlock(st);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting)
aclk_update_chart(host, st->id, ACLK_CMD_CHART);
#endif
#ifdef ENABLE_DBENGINE
metalog_upd_objcount(st->rrdhost, 1);
metalog_commit_update_dimension(rd);
#endif
return(rd);
}
@ -483,7 +480,7 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
if (can_delete_metric && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
#ifdef ENABLE_DBENGINE
/* This metric has no data and no references */
metalog_commit_delete_dimension(rd);
delete_dimension_uuid(rd->state->metric_uuid);
#endif
}
}
@ -529,7 +526,6 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
freez(rd->cache_filename);
#ifdef ENABLE_DBENGINE
if (rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
free_uuid(rd->state->metric_uuid);
freez(rd->state->metric_uuid);
}
#endif
@ -541,9 +537,6 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != rrd_memory_mode))
aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART);
#endif
#ifdef ENABLE_DBENGINE
metalog_upd_objcount(st->rrdhost, -1);
#endif
}
@ -600,10 +593,6 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) {
if (netdata_cloud_setting)
aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART);
#endif
#ifdef ENABLE_DBENGINE
metalog_commit_update_dimension(rd);
#endif
}
inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) {
@ -614,9 +603,6 @@ inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) {
if (netdata_cloud_setting)
aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART);
#endif
#ifdef ENABLE_DBENGINE
metalog_commit_update_dimension(rd);
#endif
}
// ----------------------------------------------------------------------------

View file

@ -103,6 +103,28 @@ static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_
host->hash_machine_guid = simple_hash(host->machine_guid);
}
void set_host_properties(RRDHOST *host, int update_every, RRD_MEMORY_MODE memory_mode, const char *hostname,
const char *registry_hostname, const char *guid, const char *os, const char *tags,
const char *tzone, const char *program_name, const char *program_version)
{
host->rrd_update_every = update_every;
host->rrd_memory_mode = memory_mode;
rrdhost_init_hostname(host, hostname);
rrdhost_init_machine_guid(host, guid);
rrdhost_init_os(host, os);
rrdhost_init_timezone(host, tzone);
rrdhost_init_tags(host, tags);
host->program_name = strdupz((program_name && *program_name) ? program_name : "unknown");
host->program_version = strdupz((program_version && *program_version) ? program_version : "unknown");
host->registry_hostname = strdupz((registry_hostname && *registry_hostname) ? registry_hostname : host->hostname);
}
// ----------------------------------------------------------------------------
// RRDHOST - add a host
@ -123,13 +145,12 @@ RRDHOST *rrdhost_create(const char *hostname,
char *rrdpush_api_key,
char *rrdpush_send_charts_matching,
struct rrdhost_system_info *system_info,
int is_localhost,
int is_archived
int is_localhost
) {
debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid);
#ifdef ENABLE_DBENGINE
int is_legacy = is_archived ? 0 : (memory_mode == RRD_MEMORY_MODE_DBENGINE) && is_legacy_child(guid);
int is_legacy = (memory_mode == RRD_MEMORY_MODE_DBENGINE) && is_legacy_child(guid);
#else
int is_legacy = 1;
#endif
@ -138,10 +159,11 @@ RRDHOST *rrdhost_create(const char *hostname,
int is_in_multihost = (memory_mode == RRD_MEMORY_MODE_DBENGINE && !is_legacy);
RRDHOST *host = callocz(1, sizeof(RRDHOST));
host->rrd_update_every = (update_every > 0)?update_every:1;
set_host_properties(host, (update_every > 0)?update_every:1, memory_mode, hostname, registry_hostname, guid, os,
tags, timezone, program_name, program_version);
host->rrd_history_entries = align_entries_to_pagesize(memory_mode, entries);
host->rrd_memory_mode = memory_mode;
host->health_enabled = ((memory_mode == RRD_MEMORY_MODE_NONE) || is_archived) ? 0 : health_enabled;
host->health_enabled = ((memory_mode == RRD_MEMORY_MODE_NONE)) ? 0 : health_enabled;
host->sender = mallocz(sizeof(*host->sender));
sender_init(host->sender, host);
@ -169,17 +191,6 @@ RRDHOST *rrdhost_create(const char *hostname,
netdata_mutex_init(&host->claimed_id_lock);
rrdhost_init_hostname(host, hostname);
rrdhost_init_machine_guid(host, guid);
rrdhost_init_os(host, os);
rrdhost_init_timezone(host, timezone);
rrdhost_init_tags(host, tags);
host->program_name = strdupz((program_name && *program_name)?program_name:"unknown");
host->program_version = strdupz((program_version && *program_version)?program_version:"unknown");
host->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
host->system_info = system_info;
avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
@ -187,10 +198,6 @@ RRDHOST *rrdhost_create(const char *hostname,
avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare);
avl_init_lock(&(host->rrdvar_root_index), rrdvar_compare);
if (is_archived) {
rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED);
info("Host %s is created in archived mode", hostname);
}
if(config_get_boolean(CONFIG_SECTION_GLOBAL, "delete obsolete charts files", 1))
rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS);
@ -248,7 +255,7 @@ RRDHOST *rrdhost_create(const char *hostname,
snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_varlib_dir, host->machine_guid);
host->varlib_dir = strdupz(filename);
if(!is_archived && host->health_enabled) {
if(host->health_enabled) {
int r = mkdir(host->varlib_dir, 0775);
if(r != 0 && errno != EEXIST)
error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir);
@ -256,7 +263,7 @@ RRDHOST *rrdhost_create(const char *hostname,
}
if(!is_archived && host->health_enabled) {
if(host->health_enabled) {
snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
int r = mkdir(filename, 0775);
if(r != 0 && errno != EEXIST)
@ -274,7 +281,7 @@ RRDHOST *rrdhost_create(const char *hostname,
// ------------------------------------------------------------------------
// load health configuration
if(!is_archived && host->health_enabled) {
if(host->health_enabled) {
rrdhost_wrlock(host);
health_readdir(host, health_user_config_dir(), health_stock_config_dir(), NULL);
rrdhost_unlock(host);
@ -293,13 +300,13 @@ RRDHOST *rrdhost_create(const char *hostname,
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
#ifdef ENABLE_DBENGINE
if (unlikely(-1 == uuid_parse(host->machine_guid, host->host_uuid))) {
error("Host machine GUID is not valid.");
if (likely(!uuid_parse(host->machine_guid, host->host_uuid))) {
int rc = sql_store_host(&host->host_uuid, hostname, registry_hostname, update_every, os, timezone, tags);
if (unlikely(rc))
error_report("Failed to store machine GUID to the database");
}
if (unlikely(find_or_generate_guid((void *) host, &host->host_uuid, GUID_TYPE_HOST, 1)))
error("Failed to store machine GUID to global map");
else
info("Added %s to global map for host %s", host->machine_guid, host->hostname);
error_report("Host machine GUID %s is not valid", host->machine_guid);
host->compaction_id = 0;
char dbenginepath[FILENAME_MAX + 1];
int ret;
@ -325,7 +332,6 @@ RRDHOST *rrdhost_create(const char *hostname,
return host;
}
metalog_upd_objcount(host, 1);
#else
fatal("RRD_MEMORY_MODE_DBENGINE is not supported in this platform.");
#endif
@ -385,13 +391,8 @@ RRDHOST *rrdhost_create(const char *hostname,
, host->health_default_recipient
);
if (!is_archived)
rrd_hosts_available++;
rrd_hosts_available++;
#ifdef ENABLE_DBENGINE
if (likely(!is_localhost && !is_archived && host && host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE))
metalog_commit_update_host(host);
#endif
return host;
}
@ -493,10 +494,7 @@ void rrdhost_update(RRDHOST *host
rrd_hosts_available++;
info("Host %s is not in archived mode anymore", host->hostname);
}
#ifdef ENABLE_DBENGINE
if (likely(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE))
metalog_commit_update_host(host);
#endif
return;
}
@ -550,7 +548,6 @@ RRDHOST *rrdhost_find_or_create(
, rrdpush_send_charts_matching
, system_info
, 0
, 0
);
}
else {
@ -633,6 +630,12 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) {
if (gap_when_lost_iterations_above < 1)
gap_when_lost_iterations_above = 1;
#ifdef ENABLE_DBENGINE
if (unlikely(sql_init_database())) {
return 1;
}
#endif
health_init();
rrdpush_init();
@ -658,7 +661,6 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) {
, default_rrdpush_send_charts_matching
, system_info
, 1
, 0
);
if (unlikely(!localhost)) {
rrd_unlock();
@ -800,8 +802,10 @@ void rrdhost_free(RRDHOST *host) {
// release its children resources
#ifdef ENABLE_DBENGINE
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && host->rrdeng_ctx != &multidb_ctx)
rrdeng_prepare_exit(host->rrdeng_ctx);
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
if (host->rrdeng_ctx != &multidb_ctx)
rrdeng_prepare_exit(host->rrdeng_ctx);
}
#endif
while(host->rrdset_root)
rrdset_free(host->rrdset_root);
@ -890,9 +894,6 @@ void rrdhost_free(RRDHOST *host) {
netdata_rwlock_destroy(&host->health_log.alarm_log_rwlock);
netdata_rwlock_destroy(&host->rrdhost_rwlock);
#ifdef ENABLE_DBENGINE
free_uuid(&host->host_uuid);
#endif
freez(host);
rrd_hosts_available--;
@ -1544,7 +1545,7 @@ restart_after_removal:
uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd);
if (can_delete_metric) {
/* This metric has no data and no references */
metalog_commit_delete_dimension(rd);
delete_dimension_uuid(rd->state->metric_uuid);
rrddim_free(st, rd);
if (unlikely(!last)) {
rd = st->dimensions;
@ -1568,7 +1569,6 @@ restart_after_removal:
/* If the chart still has dimensions don't delete it from the metadata log */
continue;
}
metalog_commit_delete_chart(st);
}
#endif
rrdset_rdlock(st);

View file

@ -320,7 +320,6 @@ void rrdset_free(RRDSET *st) {
rrdhost_check_wrlock(host); // make sure we have a write lock on the host
rrdset_wrlock(st); // lock this RRDSET
// info("Removing chart '%s' ('%s')", st->id, st->name);
// ------------------------------------------------------------------------
@ -389,17 +388,12 @@ void rrdset_free(RRDSET *st) {
case RRD_MEMORY_MODE_NONE:
case RRD_MEMORY_MODE_DBENGINE:
#ifdef ENABLE_DBENGINE
if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
free_uuid(st->chart_uuid);
if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
freez(st->chart_uuid);
}
#endif
freez(st);
break;
}
#ifdef ENABLE_DBENGINE
metalog_upd_objcount(host, -1);
#endif
}
@ -503,8 +497,6 @@ RRDSET *rrdset_create_custom(
, RRDSET_TYPE chart_type
, RRD_MEMORY_MODE memory_mode
, long history_entries
, int is_archived
, uuid_t *chart_uuid
) {
if(!type || !type[0]) {
fatal("Cannot create rrd stats without a type: id '%s', name '%s', family '%s', context '%s', title '%s', units '%s', plugin '%s', module '%s'."
@ -546,7 +538,7 @@ RRDSET *rrdset_create_custom(
int mark_rebuild = 0;
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
if (!is_archived && rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) {
if (rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) {
rrdset_flag_clear(st, RRDSET_FLAG_ARCHIVED);
changed_from_archived_to_active = 1;
mark_rebuild |= META_CHART_ACTIVATED;
@ -661,10 +653,12 @@ RRDSET *rrdset_create_custom(
}
}
#ifdef ENABLE_DBENGINE
if (is_archived == 0 && st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE &&
(mark_rebuild & (META_CHART_UPDATED | META_PLUGIN_UPDATED | META_MODULE_UPDATED))) {
debug(D_METADATALOG, "CHART [%s] metadata updated", st->id);
metalog_commit_update_chart(st);
int rc = update_chart_metadata(st->chart_uuid, st, id, name);
if (unlikely(rc))
error_report("Failed to update chart metadata in the database");
}
#endif
/* Fall-through during switch from archived to active so that the host lock is taken and health is linked */
@ -689,9 +683,6 @@ RRDSET *rrdset_create_custom(
rrdhost_unlock(host);
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
if (!is_archived && rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) {
rrdset_flag_clear(st, RRDSET_FLAG_ARCHIVED);
}
return st;
}
@ -821,8 +812,6 @@ RRDSET *rrdset_create_custom(
else
st->rrd_memory_mode = (memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_ALLOC;
}
if (is_archived)
rrdset_flag_set(st, RRDSET_FLAG_ARCHIVED);
st->plugin_name = plugin?strdupz(plugin):NULL;
st->module_name = module?strdupz(module):NULL;
@ -915,7 +904,7 @@ RRDSET *rrdset_create_custom(
st->next = host->rrdset_root;
host->rrdset_root = st;
if(host->health_enabled && !is_archived) {
if(host->health_enabled) {
rrdsetvar_create(st, "last_collected_t", RRDVAR_TYPE_TIME_T, &st->last_collected_time.tv_sec, RRDVAR_OPTION_DEFAULT);
rrdsetvar_create(st, "collected_total_raw", RRDVAR_TYPE_TOTAL, &st->last_collected_total, RRDVAR_OPTION_DEFAULT);
rrdsetvar_create(st, "green", RRDVAR_TYPE_CALCULATED, &st->green, RRDVAR_OPTION_DEFAULT);
@ -926,33 +915,20 @@ RRDSET *rrdset_create_custom(
if(unlikely(rrdset_index_add(host, st) != st))
error("RRDSET: INTERNAL ERROR: attempt to index duplicate chart '%s'", st->id);
if (!is_archived) {
rrdsetcalc_link_matching(st);
rrdcalctemplate_link_matching(st);
}
rrdsetcalc_link_matching(st);
rrdcalctemplate_link_matching(st);
#ifdef ENABLE_DBENGINE
if (st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
int replace_instead_of_generate = 0;
st->chart_uuid = find_chart_uuid(host, type, id, name);
if (unlikely(!st->chart_uuid))
st->chart_uuid = create_chart_uuid(st, id, name);
st->chart_uuid = callocz(1, sizeof(uuid_t));
if (NULL != chart_uuid) {
replace_instead_of_generate = 1;
uuid_copy(*st->chart_uuid, *chart_uuid);
}
if (unlikely(
find_or_generate_guid((void *) st, st->chart_uuid, GUID_TYPE_CHART, replace_instead_of_generate))) {
errno = 0;
error("FAILED to generate GUID for %s", st->id);
freez(st->chart_uuid);
st->chart_uuid = NULL;
fatal_assert(0);
}
store_active_chart(st->chart_uuid);
st->compaction_id = 0;
}
#endif
if (!is_archived)
rrdhost_cleanup_obsolete_charts(host);
rrdhost_cleanup_obsolete_charts(host);
rrdhost_unlock(host);
#ifdef ENABLE_ACLK
@ -961,11 +937,6 @@ RRDSET *rrdset_create_custom(
aclk_update_chart(host, st->id, ACLK_CMD_CHART);
}
#endif
#ifdef ENABLE_DBENGINE
metalog_upd_objcount(host, 1);
metalog_commit_update_chart(st);
#endif
return(st);
}
@ -1898,7 +1869,7 @@ after_second_database_work:
uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd);
if (can_delete_metric) {
/* This metric has no data and no references */
metalog_commit_delete_dimension(rd);
delete_dimension_uuid(rd->state->metric_uuid);
} else {
/* Do not delete this dimension */
last = rd;

230536
database/sqlite/sqlite3.c Normal file

File diff suppressed because it is too large Load diff

12174
database/sqlite/sqlite3.h Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,62 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_SQLITE_FUNCTIONS_H
#define NETDATA_SQLITE_FUNCTIONS_H
#include "../../daemon/common.h"
#include "sqlite3.h"
#define SQLITE_INSERT_DELAY (50) // Insert delay in case of lock
#define SQL_STORE_HOST "insert or replace into host (host_id,hostname,registry_hostname,update_every,os,timezone,tags) values (?1,?2,?3,?4,?5,?6,?7);"
#define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
"name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
"history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
#define SQL_FIND_CHART_UUID \
"select chart_id from chart where host_id = @host and type=@type and id=@id and (name is null or name=@name);"
#define SQL_STORE_ACTIVE_CHART \
"insert or replace into chart_active (chart_id, date_created) values (@id, strftime('%s'));"
#define SQL_STORE_DIMENSION \
"INSERT OR REPLACE into dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm) values (?0001,?0002,?0003,?0004,?0005,?0006,?0007);"
#define SQL_FIND_DIMENSION_UUID "select dim_id from dimension where chart_id=@chart and id=@id and name=@name;"
#define SQL_STORE_ACTIVE_DIMENSION \
"insert or replace into dimension_active (dim_id, date_created) values (@id, strftime('%s'));"
extern int sql_init_database(void);
extern void sql_close_database(void);
extern int sql_store_host(uuid_t *guid, const char *hostname, const char *registry_hostname, int update_every, const char *os, const char *timezone, const char *tags);
extern int sql_store_chart(
uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family,
const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority,
int update_every, int chart_type, int memory_mode, long history_entries);
extern int sql_store_dimension(uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier,
collected_number divisor, int algorithm);
extern uuid_t *find_dimension_uuid(RRDSET *st, RRDDIM *rd);
extern uuid_t *create_dimension_uuid(RRDSET *st, RRDDIM *rd);
extern void store_active_dimension(uuid_t *dimension_uuid);
extern uuid_t *find_chart_uuid(RRDHOST *host, const char *type, const char *id, const char *name);
extern uuid_t *create_chart_uuid(RRDSET *st, const char *id, const char *name);
extern int update_chart_metadata(uuid_t *chart_uuid, RRDSET *st, const char *id, const char *name);
extern void store_active_chart(uuid_t *dimension_uuid);
extern int find_uuid_type(uuid_t *uuid);
extern void sql_rrdset2json(RRDHOST *host, BUFFER *wb);
extern RRDHOST *sql_create_host_by_uuid(char *guid);
extern void db_execute(char *cmd);
extern int file_is_migrated(char *path);
extern void add_migrated_file(char *path, uint64_t file_size);
extern void db_unlock(void);
extern void db_lock(void);
extern void delete_dimension_uuid(uuid_t *dimension_uuid);
#endif //NETDATA_SQLITE_FUNCTIONS_H

View file

@ -100,9 +100,7 @@ RRDSET *rrdset_create_custom(
int update_every,
RRDSET_TYPE chart_type,
RRD_MEMORY_MODE memory_mode,
long history_entries,
int is_archived,
uuid_t *chart_uuid)
long history_entries)
{
check_expected_ptr(host);
check_expected_ptr(type);
@ -119,8 +117,6 @@ RRDSET *rrdset_create_custom(
check_expected(chart_type);
UNUSED(memory_mode);
UNUSED(history_entries);
UNUSED(is_archived);
UNUSED(chart_uuid);
function_called();
@ -149,9 +145,7 @@ RRDDIM *rrddim_add_custom(
collected_number multiplier,
collected_number divisor,
RRD_ALGORITHM algorithm,
RRD_MEMORY_MODE memory_mode,
int is_archived,
uuid_t *dim_uuid)
RRD_MEMORY_MODE memory_mode)
{
check_expected_ptr(st);
UNUSED(id);
@ -160,8 +154,6 @@ RRDDIM *rrddim_add_custom(
check_expected(divisor);
check_expected(algorithm);
UNUSED(memory_mode);
UNUSED(is_archived);
UNUSED(dim_uuid);
function_called();

View file

@ -295,6 +295,7 @@ extern char *read_by_filename(char *filename, long *file_size);
/* misc. */
#define UNUSED(x) (void)(x)
#define error_report(x, args...) do { errno = 0; error(x, ##args); } while(0)
extern void netdata_cleanup_and_exit(int ret) NORETURN;
extern void send_statistics(const char *action, const char *action_result, const char *action_data);

View file

@ -4,7 +4,7 @@
// generate JSON for the /api/v1/charts API call
static inline const char* get_release_channel() {
const char* get_release_channel() {
static int use_stable = -1;
if (use_stable == -1) {

View file

@ -7,5 +7,6 @@
extern void charts2json(RRDHOST *host, BUFFER *wb, int skip_volatile, int show_archived);
extern void chartcollectors2json(RRDHOST *host, BUFFER *wb);
extern const char* get_release_channel();
#endif //NETDATA_API_FORMATTER_CHARTS2JSON_H

View file

@ -7,6 +7,11 @@
#include <setjmp.h>
#include <cmocka.h>
#include <stdbool.h>
RRDHOST *__wrap_sql_create_host_by_uuid(char *hostname)
{
(void) hostname;
return NULL;
}
void repr(char *result, int result_size, char const *buf, int size)
{

View file

@ -8,6 +8,12 @@
#include <cmocka.h>
#include <stdbool.h>
RRDHOST *__wrap_sql_create_host_by_uuid(char *hostname)
{
(void) hostname;
return NULL;
}
void repr(char *result, int result_size, char const *buf, int size)
{
int n;

View file

@ -354,12 +354,15 @@ inline int web_client_api_request_v1_charts(RRDHOST *host, struct web_client *w,
return HTTP_RESP_OK;
}
inline int web_client_api_request_v1_archivedcharts(RRDHOST *host, struct web_client *w, char *url) {
inline int web_client_api_request_v1_archivedcharts(RRDHOST *host __maybe_unused, struct web_client *w, char *url) {
(void)url;
buffer_flush(w->response.data);
w->response.data->contenttype = CT_APPLICATION_JSON;
charts2json(host, w->response.data, 0, 1);
#ifdef ENABLE_DBENGINE
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
sql_rrdset2json(host, w->response.data);
#endif
return HTTP_RESP_OK;
}

View file

@ -1376,7 +1376,32 @@ static inline int web_client_switch_host(RRDHOST *host, struct web_client *w, ch
host = rrdhost_find_by_hostname(tok, hash);
if(!host) host = rrdhost_find_by_guid(tok, hash);
if(host) return web_client_process_url(host, w, url);
#ifdef ENABLE_DBENGINE
int release_host = 0;
if (!host) {
host = sql_create_host_by_uuid(tok);
if (likely(host)) {
rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED);
release_host = 1;
}
}
if(host) {
int rc = web_client_process_url(host, w, url);
if (release_host) {
freez(host->hostname);
freez((char *) host->os);
freez((char *) host->tags);
freez((char *) host->timezone);
freez(host->program_name);
freez(host->program_version);
freez(host->registry_hostname);
freez(host);
}
return rc;
}
#else
if (host) return web_client_process_url(host, w, url);
#endif
}
buffer_flush(w->response.data);