0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-14 09:38:34 +00:00

Handle file descriptors running out ()

* Handle file descriptors running out

* Added alarm for dbengine FS and I/O errors

* more verbose alarm message

* * Added File-Descriptor budget to Database Engine instances.
* Changed FD budget of the web server from 50% to 25%.
* Allocated 25% of FDs to dbengine.
* Created a new dbengine global FD utilization chart.
This commit is contained in:
Markos Fountoulakis 2019-06-21 12:52:58 +03:00 committed by GitHub
parent 61e45b55a2
commit 6312080b69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 578 additions and 158 deletions

View file

@ -381,6 +381,7 @@ declare -A configs_signatures=(
['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf'
['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf'
['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf'
['ce285c90747428ee5da4efb547418dda']='health.d/dbengine.conf'
['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf'
['80266bddd3df374923c750a6de91d120']='health.d/apache.conf'
['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf'

View file

@ -538,7 +538,7 @@ void global_statistics_charts(void) {
unsigned long long stats_array[RRDENG_NR_STATS];
/* get localhost's DB engine's statistics */
rrdeng_get_28_statistics(localhost->rrdeng_ctx, stats_array);
rrdeng_get_33_statistics(localhost->rrdeng_ctx, stats_array);
// ----------------------------------------------------------------
@ -749,6 +749,75 @@ void global_statistics_charts(void) {
rrddim_set_by_pointer(st_io_stats, rd_writes, (collected_number)stats_array[16]);
rrdset_done(st_io_stats);
}
// ----------------------------------------------------------------
{
static RRDSET *st_errors = NULL;
static RRDDIM *rd_fs_errors = NULL;
static RRDDIM *rd_io_errors = NULL;
if (unlikely(!st_errors)) {
st_errors = rrdset_create_localhost(
"netdata"
, "dbengine_global_errors"
, NULL
, "dbengine"
, NULL
, "NetData DB engine errors"
, "errors/s"
, "netdata"
, "stats"
, 130507
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(st_errors);
rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]);
rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]);
rrdset_done(st_errors);
}
// ----------------------------------------------------------------
{
static RRDSET *st_fd = NULL;
static RRDDIM *rd_fd_current = NULL;
static RRDDIM *rd_fd_max = NULL;
if (unlikely(!st_fd)) {
st_fd = rrdset_create_localhost(
"netdata"
, "dbengine_global_file_descriptors"
, NULL
, "dbengine"
, NULL
, "NetData DB engine File Descriptors"
, "descriptors"
, "netdata"
, "stats"
, 130508
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
rd_fd_current = rrddim_add(st_fd, "current", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_fd_max = rrddim_add(st_fd, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
else
rrdset_next(st_fd);
rrddim_set_by_pointer(st_fd, rd_fd_current, (collected_number)stats_array[32]);
/* Careful here, modify this accordingly if the File-Descriptor budget ever changes */
rrddim_set_by_pointer(st_fd, rd_fd_max, (collected_number)rlimit_nofile.rlim_cur / 4);
rrdset_done(st_fd);
}
}
#endif

View file

@ -49,44 +49,69 @@ static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_
datafile->ctx = ctx;
}
static void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
int close_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
}
int destroy_data_file(struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret, fd;
char path[1024];
int ret;
char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
fatal("uv_fs_close: %s", uv_strerror(ret));
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
generate_datafilepath(datafile, path, sizeof(path));
fd = uv_fs_unlink(NULL, &req, path, NULL);
if (fd < 0) {
fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.datafile_deletions;
return 0;
return ret;
}
int create_data_file(struct rrdengine_datafile *datafile)
@ -97,13 +122,17 @@ int create_data_file(struct rrdengine_datafile *datafile)
int ret, fd;
struct rrdeng_df_sb *superblock;
uv_buf_t iov;
char path[1024];
char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
fatal("uv_fs_fsopen: %s", uv_strerror(fd));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
datafile->file = file;
++ctx->stats.datafile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@ -117,19 +146,21 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal("uv_fs_write: %s", uv_strerror(ret));
}
if (req.result < 0) {
fatal("uv_fs_write: %s", uv_strerror((int)req.result));
assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
if (ret < 0) {
destroy_data_file(datafile);
return ret;
}
datafile->file = file;
datafile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
++ctx->stats.datafile_creations;
return 0;
}
@ -174,15 +205,15 @@ static int load_data_file(struct rrdengine_datafile *datafile)
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
int ret, fd;
int ret, fd, error;
uint64_t file_size;
char path[1024];
char path[RRDENG_PATH_MAX];
generate_datafilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
/* if (UV_ENOENT != fd) */
error("uv_fs_fsopen: %s", uv_strerror(fd));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
info("Initializing data file \"%s\".", path);
@ -205,15 +236,21 @@ static int load_data_file(struct rrdengine_datafile *datafile)
return 0;
error:
(void) uv_fs_close(NULL, &req, file, NULL);
error = ret;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
return error;
}
static int scan_data_files_cmp(const void *a, const void *b)
{
struct rrdengine_datafile *file1, *file2;
char path1[1024], path2[1024];
char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX];
file1 = *(struct rrdengine_datafile **)a;
file2 = *(struct rrdengine_datafile **)b;
@ -222,7 +259,7 @@ static int scan_data_files_cmp(const void *a, const void *b)
return strcmp(path1, path2);
}
/* Returns number of datafiles that were loaded */
/* Returns number of datafiles that were loaded or < 0 on error */
static int scan_data_files(struct rrdengine_instance *ctx)
{
int ret;
@ -233,16 +270,22 @@ static int scan_data_files(struct rrdengine_instance *ctx)
struct rrdengine_journalfile *journalfile;
ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL);
assert(ret >= 0);
assert(req.result >= 0);
if (ret < 0) {
assert(req.result < 0);
uv_fs_req_cleanup(&req);
error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return ret;
}
info("Found %d files in path %s", ret, ctx->dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
info("Scanning file \"%s\"", dent.name);
info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name);
ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no);
if (2 == ret) {
info("Matched file \"%s\"", dent.name);
info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, no);
datafiles[matched_files++] = datafile;
@ -250,70 +293,133 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
uv_fs_req_cleanup(&req);
if (0 == matched_files) {
freez(datafiles);
return 0;
}
if (matched_files == MAX_DATAFILES) {
error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
}
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
/* TODO: change this when tiering is implemented */
ctx->last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
datafile = datafiles[i];
ret = load_data_file(datafile);
if (0 != ret) {
free(datafile);
freez(datafile);
++failed_to_load;
continue;
break;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = load_journal_file(ctx, journalfile, datafile);
if (0 != ret) {
free(datafile);
free(journalfile);
close_data_file(datafile);
freez(datafile);
freez(journalfile);
++failed_to_load;
continue;
break;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
}
freez(datafiles);
if (failed_to_load) {
error("%u files failed to load.", failed_to_load);
error("%u datafiles failed to load.", failed_to_load);
finalize_data_files(ctx);
return UV_EIO;
}
free(datafiles);
return matched_files - failed_to_load;
return matched_files;
}
/* Creates a datafile and a journalfile pair */
void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno)
{
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
int ret;
char path[RRDENG_PATH_MAX];
info("Creating new data and journal files.");
info("Creating new data and journal files in path %s", ctx->dbfiles_path);
datafile = mallocz(sizeof(*datafile));
datafile_init(datafile, ctx, tier, fileno);
ret = create_data_file(datafile);
assert(!ret);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
info("Created data file \"%s\".", path);
} else {
goto error_after_datafile;
}
journalfile = mallocz(sizeof(*journalfile));
datafile->journalfile = journalfile;
journalfile_init(journalfile, datafile);
ret = create_journal_file(journalfile, datafile);
assert(!ret);
if (!ret) {
generate_journalfilepath(datafile, path, sizeof(path));
info("Created journal file \"%s\".", path);
} else {
goto error_after_journalfile;
}
datafile_list_insert(ctx, datafile);
ctx->disk_space += datafile->pos + journalfile->pos;
return 0;
error_after_journalfile:
destroy_data_file(datafile);
freez(journalfile);
error_after_datafile:
freez(datafile);
return ret;
}
/* Page cache must already be initialized. */
/* Page cache must already be initialized.
* Return 0 on success.
*/
int init_data_files(struct rrdengine_instance *ctx)
{
int ret;
ret = scan_data_files(ctx);
if (0 == ret) {
info("Data files not found, creating.");
create_new_datafile_pair(ctx, 1, 1);
if (ret < 0) {
error("Failed to scan path \"%s\".", ctx->dbfiles_path);
return ret;
} else if (0 == ret) {
info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path);
ret = create_new_datafile_pair(ctx, 1, 1);
if (ret) {
error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path);
return ret;
}
ctx->last_fileno = 1;
}
return 0;
}
void finalize_data_files(struct rrdengine_instance *ctx)
{
struct rrdengine_datafile *datafile, *next_datafile;
struct rrdengine_journalfile *journalfile;
struct extent_info *extent, *next_extent;
for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) {
journalfile = datafile->journalfile;
next_datafile = datafile->next;
for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) {
next_extent = extent->next;
freez(extent);
}
close_journal_file(journalfile, datafile);
close_data_file(datafile);
freez(journalfile);
freez(datafile);
}
}

View file

@ -55,9 +55,12 @@ struct rrdengine_datafile_list {
extern void df_extent_insert(struct extent_info *extent);
extern void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
extern void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern int close_data_file(struct rrdengine_datafile *datafile);
extern int destroy_data_file(struct rrdengine_datafile *datafile);
extern int create_data_file(struct rrdengine_datafile *datafile);
extern void create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
extern int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno);
extern int init_data_files(struct rrdengine_instance *ctx);
extern void finalize_data_files(struct rrdengine_instance *ctx);
#endif /* NETDATA_DATAFILE_H */

View file

@ -13,7 +13,7 @@ static void flush_transaction_buffer_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(io_descr->buf);
free(io_descr);
freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
@ -87,7 +87,7 @@ void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned s
return ctx->commit_log.buf + buf_pos;
}
static void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintf(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
@ -100,39 +100,62 @@ void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengin
journalfile->datafile = datafile;
}
int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
}
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret, fd;
char path[1024];
int ret;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
fatal("uv_fs_ftruncate: %s", uv_strerror(ret));
error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
fatal("uv_fs_close: %s", uv_strerror(ret));
exit(ret);
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
generate_journalfilepath(datafile, path, sizeof(path));
fd = uv_fs_unlink(NULL, &req, path, NULL);
if (fd < 0) {
fatal("uv_fs_fsunlink: %s", uv_strerror(fd));
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
assert(0 == req.result);
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
return 0;
return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@ -143,13 +166,17 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
char path[1024];
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
fatal("uv_fs_fsopen: %s", uv_strerror(fd));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
journalfile->file = file;
++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
@ -162,19 +189,21 @@ int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdeng
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal("uv_fs_write: %s", uv_strerror(ret));
}
if (req.result < 0) {
fatal("uv_fs_write: %s", uv_strerror((int)req.result));
assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
if (ret < 0) {
destroy_journal_file(journalfile, datafile);
return ret;
}
journalfile->file = file;
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
++ctx->stats.journalfile_creations;
return 0;
}
@ -263,6 +292,8 @@ static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrden
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
@ -398,15 +429,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
{
uv_fs_t req;
uv_file file;
int ret, fd;
int ret, fd, error;
uint64_t file_size, max_id;
char path[1024];
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
/* if (UV_ENOENT != fd) */
error("uv_fs_fsopen: %s", uv_strerror(fd));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
info("Loading journal file \"%s\".", path);
@ -433,9 +464,15 @@ int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfi
return 0;
error:
(void) uv_fs_close(NULL, &req, file, NULL);
error = ret;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
return error;
}
void init_commit_log(struct rrdengine_instance *ctx)

View file

@ -33,9 +33,11 @@ struct transaction_commit_log {
unsigned buf_size;
};
extern void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
extern void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern void *wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size);
extern void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc);
extern int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile);
extern int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,

View file

@ -287,7 +287,7 @@ static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_
{
struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
free(pg_cache_descr->page);
freez(pg_cache_descr->page);
pg_cache_descr->page = NULL;
pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
pg_cache_release_pages_unsafe(ctx, 1);
@ -330,7 +330,7 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 1;
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
};
}
uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
/* failed to evict */
@ -594,7 +594,7 @@ struct pg_cache_page_index *
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
};
}
uv_rwlock_rdunlock(&page_index->lock);
failed_to_reserve = 0;
@ -767,6 +767,7 @@ struct pg_cache_page_index *create_page_index(uuid_t *id)
assert(0 == uv_rwlock_init(&page_index->lock));
page_index->oldest_time = INVALID_TIME;
page_index->latest_time = INVALID_TIME;
page_index->prev = NULL;
return page_index;
}
@ -776,6 +777,7 @@ static void init_metrics_index(struct rrdengine_instance *ctx)
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
pg_cache->metrics_index.last_page_index = NULL;
assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
}
@ -809,4 +811,65 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
}
void free_page_cache(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
Word_t ret_Judy, bytes_freed = 0;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index, *prev_page_index;
Word_t Index;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
/* Free commited page index */
ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
assert(NULL == pg_cache->commited_page_index.JudyL_array);
bytes_freed += ret_Judy;
for (page_index = pg_cache->metrics_index.last_page_index ;
page_index != NULL ;
page_index = prev_page_index) {
prev_page_index = page_index->prev;
/* Find first page in range */
Index = (Word_t) 0;
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
if (likely(NULL != PValue)) {
descr = *PValue;
}
while (descr != NULL) {
/* Iterate all page descriptors of this metric */
if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
/* Check rrdenglocking.c */
pg_cache_descr = descr->pg_cache_descr;
if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
freez(pg_cache_descr->page);
bytes_freed += RRDENG_BLOCK_SIZE;
}
rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
bytes_freed += sizeof(*pg_cache_descr);
}
freez(descr);
bytes_freed += sizeof(*descr);
PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
descr = unlikely(NULL == PValue) ? NULL : *PValue;
}
/* Free page index */
ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
assert(NULL == page_index->JudyL_array);
bytes_freed += ret_Judy;
freez(page_index);
bytes_freed += sizeof(*page_index);
}
/* Free metrics index */
ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
assert(NULL == pg_cache->metrics_index.JudyHS_array);
bytes_freed += ret_Judy;
info("Freed %lu bytes of memory from page cache.", bytes_freed);
}

View file

@ -84,12 +84,15 @@ struct pg_cache_page_index {
* It's also written by the data deletion workqueue when data collection is disabled for this metric.
*/
usec_t latest_time;
struct pg_cache_page_index *prev;
};
/* maps UUIDs to page indices */
struct pg_cache_metrics_index {
uv_rwlock_t lock;
Pvoid_t JudyHS_array;
struct pg_cache_page_index *last_page_index;
};
/* gathers dirty pages to be written on disk */
@ -153,6 +156,7 @@ extern struct rrdeng_page_descr *
usec_t point_in_time);
extern struct pg_cache_page_index *create_page_index(uuid_t *id);
extern void init_page_cache(struct rrdengine_instance *ctx);
extern void free_page_cache(struct rrdengine_instance *ctx);
extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);

View file

@ -3,6 +3,10 @@
#include "rrdengine.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
void sanity_check(void)
{
/* Magic numbers must fit in the super-blocks */
@ -33,7 +37,6 @@ void read_extent_cb(uv_fs_t* req)
unsigned i, j, count;
void *page, *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length;
struct rrdengine_datafile *datafile;
/* persistent structures */
struct rrdeng_df_extent_header *header;
struct rrdeng_df_extent_trailer *trailer;
@ -55,9 +58,13 @@ void read_extent_cb(uv_fs_t* req)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer));
ret = crc32cmp(trailer->checksum, crc);
datafile = xt_io_descr->descr_array[0]->extent->datafile;
debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
#ifdef NETDATA_INTERNAL_CHECKS
{
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__,
xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED");
}
#endif
if (unlikely(ret)) {
/* TODO: handle errors */
exit(UV_EIO);
@ -112,14 +119,14 @@ void read_extent_cb(uv_fs_t* req)
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
if (RRD_NO_COMPRESSION != header->compression_algorithm) {
free(uncompressed_buf);
freez(uncompressed_buf);
}
if (xt_io_descr->completion)
complete(xt_io_descr->completion);
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
free(xt_io_descr);
freez(xt_io_descr);
}
@ -144,7 +151,7 @@ static void do_read_extent(struct rrdengine_worker_config* wc,
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
/* free(xt_io_descr);
/* freez(xt_io_descr);
return;*/
}
for (i = 0 ; i < count; ++i) {
@ -233,7 +240,6 @@ void flush_pages_cb(uv_fs_t* req)
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
struct rrdengine_datafile *datafile;
int ret;
unsigned i, count;
Word_t commit_id;
@ -243,10 +249,13 @@ void flush_pages_cb(uv_fs_t* req)
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
goto cleanup;
}
datafile = xt_io_descr->descr_array[0]->extent->datafile;
debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
__func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
#ifdef NETDATA_INTERNAL_CHECKS
{
struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile;
debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.",
__func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno);
}
#endif
count = xt_io_descr->descr_count;
for (i = 0 ; i < count ; ++i) {
/* care, we don't hold the descriptor mutex */
@ -273,7 +282,7 @@ void flush_pages_cb(uv_fs_t* req)
cleanup:
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
free(xt_io_descr);
freez(xt_io_descr);
}
/*
@ -353,7 +362,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
/* free(xt_io_descr);*/
/* freez(xt_io_descr);*/
}
(void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count);
xt_io_descr->descr_count = count;
@ -405,7 +414,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
ctx->stats.after_compress_bytes += compressed_size;
debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size);
(void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
free(compressed_buf);
freez(compressed_buf);
size_bytes = payload_offset + compressed_size + sizeof(*trailer);
header->payload_length = compressed_size;
break;
@ -443,23 +452,36 @@ static void after_delete_old_data(uv_work_t *req, int status)
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
unsigned bytes;
unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
int ret;
char path[RRDENG_PATH_MAX];
(void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
bytes = datafile->pos + journalfile->pos;
datafile_bytes = datafile->pos;
journalfile_bytes = journalfile->pos;
deleted_bytes = 0;
info("Deleting data and journal file pair.");
datafile_list_delete(ctx, datafile);
destroy_journal_file(journalfile, datafile);
destroy_data_file(datafile);
info("Deleted data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
datafile->tier, datafile->fileno);
free(journalfile);
free(datafile);
ret = destroy_journal_file(journalfile, datafile);
if (!ret) {
generate_journalfilepath(datafile, path, sizeof(path));
info("Deleted journal file \"%s\".", path);
deleted_bytes += journalfile_bytes;
}
ret = destroy_data_file(datafile);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
info("Deleted data file \"%s\".", path);
deleted_bytes += datafile_bytes;
}
freez(journalfile);
freez(datafile);
ctx->disk_space -= bytes;
info("Reclaimed %u bytes of disk space.", bytes);
ctx->disk_space -= deleted_bytes;
info("Reclaimed %u bytes of disk space.", deleted_bytes);
/* unfreeze command processing */
wc->now_deleting.data = NULL;
@ -485,7 +507,7 @@ static void delete_old_data(uv_work_t *req)
pg_cache_punch_hole(ctx, descr, 0);
}
next = extent->next;
free(extent);
freez(extent);
}
}
@ -495,6 +517,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
int ret;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@ -509,7 +532,10 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) {
/* Finalize data and journal file and create a new pair */
wal_flush_transaction_buffer(wc);
create_new_datafile_pair(ctx, 1, datafile->fileno + 1);
ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1);
if (likely(!ret)) {
++ctx->last_fileno;
}
}
if (unlikely(out_of_space)) {
/* delete old data */
@ -517,18 +543,30 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
/* already deleting data */
return;
}
info("Deleting data file \""DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
if (NULL == ctx->datafiles.first->next) {
error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\""
" to reclaim space, there are no other file pairs left.",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
return;
}
info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
wc->now_deleting.data = ctx;
uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data);
assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
}
}
/* return 0 on success */
int init_rrd_files(struct rrdengine_instance *ctx)
{
return init_data_files(ctx);
}
void finalize_rrd_files(struct rrdengine_instance *ctx)
{
return finalize_data_files(ctx);
}
void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc)
{
wc->cmd_queue.head = wc->cmd_queue.tail = 0;
@ -596,7 +634,6 @@ void async_cb(uv_async_t *handle)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
struct rrdengine_instance *ctx = wc->ctx;
uv_stop(handle->loop);
uv_update_time(handle->loop);
@ -616,7 +653,7 @@ void timer_cb(uv_timer_t* handle)
#ifdef NETDATA_INTERNAL_CHECKS
{
char buf[4096];
debug(D_RRDENGINE, "%s", get_rrdeng_statistics(ctx, buf, sizeof(buf)));
debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf)));
}
#endif
}
@ -631,7 +668,7 @@ void rrdeng_worker(void* arg)
struct rrdengine_worker_config* wc = arg;
struct rrdengine_instance *ctx = wc->ctx;
uv_loop_t* loop;
int shutdown;
int shutdown, ret;
enum rrdeng_opcode opcode;
uv_timer_t timer_req;
struct rrdeng_cmd cmd;
@ -639,22 +676,35 @@ void rrdeng_worker(void* arg)
rrdeng_init_cmd_queue(wc);
loop = wc->loop = mallocz(sizeof(uv_loop_t));
uv_loop_init(loop);
ret = uv_loop_init(loop);
if (ret) {
error("uv_loop_init(): %s", uv_strerror(ret));
goto error_after_loop_init;
}
loop->data = wc;
uv_async_init(wc->loop, &wc->async, async_cb);
ret = uv_async_init(wc->loop, &wc->async, async_cb);
if (ret) {
error("uv_async_init(): %s", uv_strerror(ret));
goto error_after_async_init;
}
wc->async.data = wc;
wc->now_deleting.data = NULL;
/* dirty page flushing timer */
uv_timer_init(loop, &timer_req);
ret = uv_timer_init(loop, &timer_req);
if (ret) {
error("uv_timer_init(): %s", uv_strerror(ret));
goto error_after_timer_init;
}
timer_req.data = wc;
wc->error = 0;
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS);
assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (shutdown == 0 || uv_loop_alive(loop)) {
uv_run(loop, UV_RUN_DEFAULT);
@ -669,12 +719,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
if (unlikely(wc->now_deleting.data)) {
/* postpone shutdown until after deletion */
info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
rrdeng_enq_cmd(wc, &cmd);
break;
}
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
@ -683,10 +727,6 @@ void rrdeng_worker(void* arg)
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
info("Shutting down RRD engine event loop.");
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all commited pages. */
}
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@ -716,6 +756,13 @@ void rrdeng_worker(void* arg)
} while (opcode != RRDENG_NOOP);
}
/* cleanup operations of the event loop */
if (unlikely(wc->now_deleting.data)) {
info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
}
info("Shutting down RRD engine event loop.");
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all commited pages. */
}
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);
@ -724,7 +771,20 @@ void rrdeng_worker(void* arg)
uv_cond_destroy(&wc->cmd_cond);
/* uv_mutex_destroy(&wc->cmd_mutex); */
assert(0 == uv_loop_close(loop));
free(loop);
freez(loop);
return;
error_after_timer_init:
uv_close((uv_handle_t *)&wc->async, NULL);
error_after_async_init:
assert(0 == uv_loop_close(loop));
error_after_loop_init:
freez(loop);
wc->error = UV_EAGAIN;
/* wake up initialization thread */
complete(&ctx->rrdengine_completion);
}

View file

@ -112,6 +112,8 @@ struct rrdengine_worker_config {
uv_cond_t cmd_cond;
volatile unsigned queue_size;
struct rrdeng_cmdqueue cmd_queue;
int error;
};
/*
@ -144,10 +146,18 @@ struct rrdengine_statistics {
rrdeng_stats_t journalfile_creations;
rrdeng_stats_t journalfile_deletions;
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
};
/* I/O errors global counter */
extern rrdeng_stats_t global_io_errors;
/* File-System errors global counter */
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
struct rrdengine_instance {
rrdengine_state_t rrdengine_state;
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
@ -157,6 +167,7 @@ struct rrdengine_instance {
char dbfiles_path[FILENAME_MAX+1];
uint64_t disk_space;
uint64_t max_disk_space;
unsigned last_fileno; /* newest index of datafile and journalfile */
unsigned long max_cache_pages;
unsigned long cache_pages_low_watermark;
@ -165,6 +176,7 @@ struct rrdengine_instance {
extern void sanity_check(void);
extern int init_rrd_files(struct rrdengine_instance *ctx);
extern void finalize_rrd_files(struct rrdengine_instance *ctx);
extern void rrdeng_test_quota(struct rrdengine_worker_config* wc);
extern void rrdeng_worker(void* arg);
extern void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd);

View file

@ -55,6 +55,8 @@ void rrdeng_store_metric_init(RRDDIM *rd)
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &temp_id, sizeof(uuid_t), PJE0);
assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(&temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
rd->state->rrdeng_uuid = &page_index->id;
@ -119,9 +121,9 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
handle->prev_descr = descr;
}
} else {
free(descr->pg_cache_descr->page);
freez(descr->pg_cache_descr->page);
rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
free(descr);
freez(descr);
}
handle->descr = NULL;
}
@ -434,7 +436,13 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
return pg_cache_descr->page;
}
void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
/*
* Gathers Database Engine statistics.
* Careful when modifying this function.
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@ -466,7 +474,12 @@ void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long
array[25] = (uint64_t)ctx->stats.journalfile_creations;
array[26] = (uint64_t)ctx->stats.journalfile_deletions;
array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
assert(RRDENG_NR_STATS == 28);
array[28] = (uint64_t)ctx->stats.io_errors;
array[29] = (uint64_t)ctx->stats.fs_errors;
array[30] = (uint64_t)global_io_errors;
array[31] = (uint64_t)global_fs_errors;
array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
assert(RRDENG_NR_STATS == 33);
}
/* Releases reference to page */
@ -477,14 +490,29 @@ void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
}
/*
* Returns 0 on success, 1 on error
* Returns 0 on success, negative on error
*/
int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb, unsigned disk_space_mb)
{
struct rrdengine_instance *ctx;
int error;
uint32_t max_open_files;
sanity_check();
max_open_files = rlimit_nofile.rlim_cur / 4;
/* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
if (rrdeng_reserved_file_descriptors > max_open_files) {
error("Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
(unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files);
rrd_stat_atomic_add(&global_fs_errors, 1);
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return UV_EMFILE;
}
if (NULL == ctxp) {
/* for testing */
ctx = &default_global_ctx;
@ -492,10 +520,6 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
} else {
*ctxp = ctx = callocz(1, sizeof(*ctx));
}
if (ctx->rrdengine_state != RRDENGINE_STATUS_UNINITIALIZED) {
return 1;
}
ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZING;
ctx->global_compress_alg = RRD_LZ4;
if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
@ -514,11 +538,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
init_commit_log(ctx);
error = init_rrd_files(ctx);
if (error) {
ctx->rrdengine_state = RRDENGINE_STATUS_UNINITIALIZED;
if (ctx != &default_global_ctx) {
freez(ctx);
}
return 1;
goto error_after_init_rrd_files;
}
init_completion(&ctx->rrdengine_completion);
@ -526,9 +546,21 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
/* wait for worker thread to initialize */
wait_for_completion(&ctx->rrdengine_completion);
destroy_completion(&ctx->rrdengine_completion);
ctx->rrdengine_state = RRDENGINE_STATUS_INITIALIZED;
if (ctx->worker_config.error) {
goto error_after_rrdeng_worker;
}
return 0;
error_after_rrdeng_worker:
finalize_rrd_files(ctx);
error_after_init_rrd_files:
free_page_cache(ctx);
if (ctx != &default_global_ctx) {
freez(ctx);
*ctxp = NULL;
}
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return UV_EIO;
}
/*
@ -539,10 +571,6 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
struct rrdeng_cmd cmd;
if (NULL == ctx) {
/* TODO: move to per host basis */
ctx = &default_global_ctx;
}
if (ctx->rrdengine_state != RRDENGINE_STATUS_INITIALIZED) {
return 1;
}
@ -552,8 +580,12 @@ int rrdeng_exit(struct rrdengine_instance *ctx)
assert(0 == uv_thread_join(&ctx->worker_config.thread));
finalize_rrd_files(ctx);
free_page_cache(ctx);
if (ctx != &default_global_ctx) {
freez(ctx);
}
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
return 0;
}

View file

@ -8,7 +8,9 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (32)
#define RRDENG_MIN_DISK_SPACE_MB (256)
#define RRDENG_NR_STATS (28)
#define RRDENG_NR_STATS (33)
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
@ -30,7 +32,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
extern void rrdeng_get_28_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,

View file

@ -103,7 +103,7 @@ int open_file_direct_io(char *path, int flags, uv_file *file)
error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
} else {
error("Failed to open file \"%s\".", path);
return fd;
--direct; /* break the loop */
}
} else {
assert(req.result >= 0);

View file

@ -31,6 +31,8 @@ typedef uintptr_t rrdeng_stats_t;
#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
#endif
#define RRDENG_PATH_MAX (4096)
/* returns old *ptr value */
static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
unsigned long oldval, unsigned long newval)

View file

@ -22,7 +22,7 @@ void rrdeng_destroy_pg_cache_descr(struct rrdengine_instance *ctx, struct page_c
{
uv_cond_destroy(&pg_cache_descr->cond);
uv_mutex_destroy(&pg_cache_descr->mutex);
free(pg_cache_descr);
freez(pg_cache_descr);
rrd_stat_atomic_add(&ctx->stats.page_cache_descriptors, -1);
}
@ -102,7 +102,6 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
we_locked = 0;
while (1) { /* spin */
old_state = descr->pg_cache_descr_state;
assert(old_state & PG_CACHE_DESCR_ALLOCATED);
old_users = old_state >> PG_CACHE_DESCR_SHIFT;
if (unlikely(we_locked)) {
@ -119,6 +118,7 @@ void rrdeng_page_descr_mutex_unlock(struct rrdengine_instance *ctx, struct rrden
assert(0 == old_users);
continue; /* spin */
}
assert(old_state & PG_CACHE_DESCR_ALLOCATED);
pg_cache_descr = descr->pg_cache_descr;
/* caller is the only page cache descriptor user and there are no pending references on the page */
if ((old_state & PG_CACHE_DESCR_DESTROY) && (1 == old_users) &&

View file

@ -86,4 +86,5 @@ dist_healthconfig_DATA = \
health.d/wmi.conf \
health.d/x509check.conf \
health.d/zfs.conf \
health.d/dbengine.conf \
$(NULL)

View file

@ -0,0 +1,26 @@
# you can disable an alarm notification by setting the 'to' line to: silent
alarm: 10min_dbengine_global_fs_errors
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of FS errors
units: errors
every: 10s
crit: $this > 0
delay: down 15m multiplier 1.5 max 1h
info: number of File-System errors dbengine came across the last 10 minutes (too many open files, wrong permissions etc)
to: sysadmin
alarm: 10min_dbengine_global_io_errors
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of I/O errors
units: errors
every: 10s
crit: $this > 0
delay: down 1h multiplier 1.5 max 3h
info: number of IO errors dbengine came across the last 10 minutes (out of space, bad disk etc)
to: sysadmin

View file

@ -474,7 +474,7 @@ void *socket_listen_main_static_threaded(void *ptr) {
if(static_threaded_workers_count < 1) static_threaded_workers_count = 1;
size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 2));
size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets", (long long int)(rlimit_nofile.rlim_cur / 4));
static_workers_private_data = callocz((size_t)static_threaded_workers_count, sizeof(struct web_server_static_threaded_worker));