diff --git a/daemon/main.c b/daemon/main.c index 0826e09d4f..2ec5c33f9e 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -44,6 +44,9 @@ void netdata_cleanup_and_exit(int ret) { // stop everything info("EXIT: stopping static threads..."); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_sync_exit_all(); +#endif cancel_main_threads(); // free the database @@ -360,13 +363,16 @@ int help(int exitcode) { " -W stacksize=N Set the stacksize (in bytes).\n\n" " -W debug_flags=N Set runtime tracing to debug.log.\n\n" " -W unittest Run internal unittests and exit.\n\n" + " -W sqlite-check Check metadata database integrity and exit.\n\n" + " -W sqlite-fix Check metadata database integrity, fix if needed and exit.\n\n" + " -W sqlite-compact Reclaim metadata database unused space and exit.\n\n" #ifdef ENABLE_DBENGINE " -W createdataset=N Create a DB engine dataset of N seconds and exit.\n\n" " -W stresstest=A,B,C,D,E,F\n" " Run a DB engine stress test for A seconds,\n" " with B writers and C readers, with a ramp up\n" " time of D seconds for writers, a page cache\n" - " size of E MiB, an optional disk space limit" + " size of E MiB, an optional disk space limit\n" " of F MiB and exit.\n\n" #endif " -W set section option value\n" @@ -801,6 +807,20 @@ int main(int argc, char **argv) { char* createdataset_string = "createdataset="; char* stresstest_string = "stresstest="; #endif + if(strcmp(optarg, "sqlite-check") == 0) { + sql_init_database(DB_CHECK_INTEGRITY); + return 0; + } + + if(strcmp(optarg, "sqlite-fix") == 0) { + sql_init_database(DB_CHECK_FIX_DB); + return 0; + } + + if(strcmp(optarg, "sqlite-compact") == 0) { + sql_init_database(DB_CHECK_RECLAIM_SPACE); + return 0; + } if(strcmp(optarg, "unittest") == 0) { if(unit_test_buffer()) return 1; diff --git a/database/rrdhost.c b/database/rrdhost.c index 8883250dc8..d9608b740e 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -690,7 +690,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { if (gap_when_lost_iterations_above < 1) gap_when_lost_iterations_above = 1; - if (unlikely(sql_init_database())) { + if (unlikely(sql_init_database(DB_CHECK_NONE))) { if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) fatal("Failed to initialize SQLite"); info("Skipping SQLITE metadata initialization since memory mode is not db engine"); diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 819b69db4d..6803092f27 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -97,7 +97,7 @@ int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct /* wait for free space in queue */ uv_mutex_lock(&wc->cmd_mutex); - if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { + if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) { uv_mutex_unlock(&wc->cmd_mutex); return 1; } @@ -118,6 +118,10 @@ void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_d /* wait for free space in queue */ uv_mutex_lock(&wc->cmd_mutex); + if (wc->is_shutting_down) { + uv_mutex_unlock(&wc->cmd_mutex); + return; + } while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); } @@ -142,10 +146,12 @@ struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_confi uv_mutex_lock(&wc->cmd_mutex); queue_size = wc->queue_size; - if (queue_size == 0) { + if (queue_size == 0 || wc->is_shutting_down) { memset(&ret, 0, sizeof(ret)); ret.opcode = ACLK_DATABASE_NOOP; ret.completion = NULL; + if (wc->is_shutting_down) + uv_cond_signal(&wc->cmd_cond); } else { /* dequeue command */ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; @@ -156,7 +162,6 @@ struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_confi wc->cmd_queue.head + 1 : 0; } wc->queue_size = queue_size - 1; - /* wake up producers */ uv_cond_signal(&wc->cmd_cond); } @@ -184,6 +189,30 @@ int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd) return (wc == NULL); } +void aclk_sync_exit_all() +{ + rrd_wrlock(); + RRDHOST *host = localhost; + while(host) { + struct aclk_database_worker_config *wc = host->dbsync_worker; + if (wc) { + wc->is_shutting_down = 1; + (void) aclk_database_deq_cmd(wc); + uv_cond_signal(&wc->cmd_cond); + } + host = host->next; + } + rrd_unlock(); + + uv_mutex_lock(&aclk_async_lock); + struct aclk_database_worker_config *wc = aclk_thread_head; + while (wc) { + wc->is_shutting_down = 1; + wc = wc->next; + } + uv_mutex_unlock(&aclk_async_lock); +} + int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) { char uuid_str[GUID_LEN + 1]; @@ -313,7 +342,7 @@ void aclk_database_worker(void *arg) struct aclk_database_cmd cmd; unsigned cmd_batch_size; - aclk_database_init_cmd_queue(wc); + //aclk_database_init_cmd_queue(wc); char threadname[NETDATA_THREAD_NAME_MAX+1]; if (wc->host) @@ -347,9 +376,9 @@ void aclk_database_worker(void *arg) timer_req.data = wc; fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - wc->retry_count = 0; +// wc->retry_count = 0; wc->node_info_send = (wc->host && !localhost); - aclk_add_worker_thread(wc); +// aclk_add_worker_thread(wc); info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc)); memset(&cmd, 0, sizeof(cmd)); @@ -359,11 +388,10 @@ void aclk_database_worker(void *arg) if (!wc->chart_payload_count) info("%s: No pending charts and dimensions detected during startup", wc->host_guid); #endif - wc->chart_updates = 0; + wc->startup_time = now_realtime_sec(); wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; - wc->alert_updates = 0; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); while (likely(!netdata_exit)) { @@ -498,7 +526,7 @@ void aclk_database_worker(void *arg) uv_close((uv_handle_t *)&timer_req, NULL); /* cleanup operations of the event loop */ - info("Shutting down ACLK sync event loop."); + //info("Shutting down ACLK sync event loop for %s", wc->host_guid); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, @@ -508,7 +536,7 @@ void aclk_database_worker(void *arg) uv_close((uv_handle_t *)&wc->async, NULL); uv_run(loop, UV_RUN_DEFAULT); - info("Shutting down ACLK sync event loop complete."); + info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid); /* TODO: don't let the API block by waiting to enqueue commands */ uv_cond_destroy(&wc->cmd_cond); /* uv_mutex_destroy(&wc->cmd_mutex); */ @@ -597,6 +625,11 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) strcpy(wc->host_guid, host_guid); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); + wc->chart_updates = 0; + wc->alert_updates = 0; + wc->retry_count = 0; + aclk_database_init_cmd_queue(wc); + aclk_add_worker_thread(wc); fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc)); #else UNUSED(host); diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index d0499903b2..d554e1069f 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -193,6 +193,7 @@ struct aclk_database_worker_config { int node_info_send; int chart_pending; int chart_reset_count; + volatile unsigned is_shutting_down; struct aclk_database_worker_config *next; }; @@ -227,4 +228,5 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc); void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int claimed(); +void aclk_sync_exit_all(); #endif //NETDATA_SQLITE_ACLK_H diff --git a/database/sqlite/sqlite_functions.c b/database/sqlite/sqlite_functions.c index 4bee8d382e..a0b8ac0196 100644 --- a/database/sqlite/sqlite_functions.c +++ b/database/sqlite/sqlite_functions.c @@ -12,6 +12,10 @@ const char *database_config[] = { "chart_type int, memory_mode int, history_entries);", "CREATE TABLE IF NOT EXISTS dimension(dim_id blob PRIMARY KEY, chart_id blob, id text, name text, " "multiplier int, divisor int , algorithm int, options text);", + + "DROP TABLE IF EXISTS chart_active;", + "DROP TABLE IF EXISTS dimension_active;", + "CREATE TABLE IF NOT EXISTS chart_active(chart_id blob PRIMARY KEY, date_created int);", "CREATE TABLE IF NOT EXISTS dimension_active(dim_id blob primary key, date_created int);", "CREATE TABLE IF NOT EXISTS metadata_migration(filename text, file_size, date_created int);", @@ -45,8 +49,10 @@ const char *database_config[] = { "INSERT INTO chart_hash_map (chart_id, hash_id) values (new.chart_id, new.hash_id) " "on conflict (chart_id, hash_id) do nothing; END; ", - "delete from chart_active;", - "delete from dimension_active;", + NULL +}; + +const char *database_cleanup[] = { "delete from chart where chart_id not in (select chart_id from dimension);", "delete from host where host_id not in (select host_id from chart);", "delete from chart_label where chart_id not in (select chart_id from chart);", @@ -180,18 +186,153 @@ void store_active_dimension(uuid_t *dimension_uuid) return; } +static int check_table_integrity_cb(void *data, int argc, char **argv, char **column) +{ + int *status = data; + UNUSED(argc); + UNUSED(column); + info("---> %s", argv[0]); + *status = (strcmp(argv[0], "ok") != 0); + return 0; +} + + +static int check_table_integrity(char *table) +{ + int status = 0; + char *err_msg = NULL; + char wstr[255]; + + if (table) { + info("Checking table %s", table); + snprintfz(wstr, 254, "PRAGMA integrity_check(%s);", table); + } + else { + info("Checking entire database"); + strcpy(wstr,"PRAGMA integrity_check;"); + } + + int rc = sqlite3_exec(db_meta, wstr, check_table_integrity_cb, (void *) &status, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error during database integrity check for %s, rc = %d (%s)", + table ? table : "the entire database", rc, err_msg); + sqlite3_free(err_msg); + } + + return status; +} + +const char *rebuild_chart_commands[] = { + "BEGIN TRANSACTION; ", + "DROP INDEX IF EXISTS ind_c1;" , + "DROP TABLE IF EXISTS chart_backup; " , + "CREATE TABLE chart_backup AS SELECT * FROM chart; " , + "DROP TABLE chart; ", + "CREATE TABLE IF NOT EXISTS chart(chart_id blob PRIMARY KEY, host_id blob, type text, id text, " + "name text, family text, context text, title text, unit text, plugin text, " + "module text, priority int, update_every int, chart_type int, memory_mode int, history_entries); ", + "INSERT INTO chart SELECT DISTINCT * FROM chart_backup; ", + "DROP TABLE chart_backup; " , + "CREATE INDEX IF NOT EXISTS ind_c1 on chart (host_id, id, type, name);", + "COMMIT TRANSACTION;", + NULL +}; + +static void rebuild_chart() +{ + int rc; + char *err_msg = NULL; + info("Rebuilding chart table"); + for (int i = 0; rebuild_chart_commands[i]; i++) { + info("Executing %s", rebuild_chart_commands[i]); + rc = sqlite3_exec(db_meta, rebuild_chart_commands[i], 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error during database setup, rc = %d (%s)", rc, err_msg); + error_report("SQLite failed statement %s", rebuild_chart_commands[i]); + sqlite3_free(err_msg); + } + } + return; +} + +const char *rebuild_dimension_commands[] = { + "BEGIN TRANSACTION; ", + "DROP INDEX IF EXISTS ind_d1;" , + "DROP TABLE IF EXISTS dimension_backup; " , + "CREATE TABLE dimension_backup AS SELECT * FROM dimension; " , + "DROP TABLE dimension; " , + "CREATE TABLE IF NOT EXISTS dimension(dim_id blob PRIMARY KEY, chart_id blob, id text, name text, " + "multiplier int, divisor int , algorithm int, options text);" , + "INSERT INTO dimension SELECT distinct * FROM dimension_backup; " , + "DROP TABLE dimension_backup; " , + "CREATE INDEX IF NOT EXISTS ind_d1 on dimension (chart_id, id, name);", + "COMMIT TRANSACTION;", + NULL +}; + +void rebuild_dimension() +{ + int rc; + char *err_msg = NULL; + + info("Rebuilding dimension table"); + for (int i = 0; rebuild_dimension_commands[i]; i++) { + info("Executing %s", rebuild_dimension_commands[i]); + rc = sqlite3_exec(db_meta, rebuild_dimension_commands[i], 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error during database setup, rc = %d (%s)", rc, err_msg); + error_report("SQLite failed statement %s", rebuild_dimension_commands[i]); + sqlite3_free(err_msg); + } + } + return; +} + +static int attempt_database_fix() +{ + info("Closing database and attempting to fix it"); + int rc = sqlite3_close(db_meta); + if (rc != SQLITE_OK) + error_report("Failed to close database, rc = %d", rc); + info("Attempting to fix database"); + db_meta = NULL; + return sql_init_database(DB_CHECK_FIX_DB | DB_CHECK_CONT); +} + +static int init_database_batch(int rebuild, int init_type, const char *batch[]) +{ + int rc; + char *err_msg = NULL; + for (int i = 0; batch[i]; i++) { + debug(D_METADATALOG, "Executing %s", batch[i]); + rc = sqlite3_exec(db_meta, batch[i], 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error_report("SQLite error during database %s, rc = %d (%s)", init_type ? "cleanup" : "setup", rc, err_msg); + error_report("SQLite failed statement %s", batch[i]); + sqlite3_free(err_msg); + if (SQLITE_CORRUPT == rc) { + if (!rebuild) + return attempt_database_fix(); + rc = check_table_integrity(NULL); + if (rc) + error_report("Databse integrity errors reported"); + } + return 1; + } + } + return 0; +} + /* * Initialize the SQLite database * Return 0 on success */ -int sql_init_database(void) +int sql_init_database(db_check_action_type_t rebuild) { char *err_msg = NULL; char sqlite_database[FILENAME_MAX + 1]; int rc; - fatal_assert(0 == uv_mutex_init(&sqlite_transaction_lock)); - snprintfz(sqlite_database, FILENAME_MAX, "%s/netdata-meta.db", netdata_configured_cache_dir); rc = sqlite3_open(sqlite_database, &db_meta); if (rc != SQLITE_OK) { @@ -201,18 +342,55 @@ int sql_init_database(void) return 1; } - info("SQLite database %s initialization", sqlite_database); + if (rebuild & (DB_CHECK_INTEGRITY | DB_CHECK_FIX_DB)) { + int errors_detected = 0; + if (!(rebuild & DB_CHECK_CONT)) + info("Running database check on %s", sqlite_database); - for (int i = 0; database_config[i]; i++) { - debug(D_METADATALOG, "Executing %s", database_config[i]); - rc = sqlite3_exec(db_meta, database_config[i], 0, 0, &err_msg); - if (rc != SQLITE_OK) { - error_report("SQLite error during database setup, rc = %d (%s)", rc, err_msg); - error_report("SQLite failed statement %s", database_config[i]); - sqlite3_free(err_msg); - return 1; + if (check_table_integrity("chart")) { + errors_detected++; + if (rebuild & DB_CHECK_FIX_DB) + rebuild_chart(); + else + error_report("Errors reported -- run with -W sqlite-fix"); + } + + if (check_table_integrity("dimension")) { + errors_detected++; + if (rebuild & DB_CHECK_FIX_DB) + rebuild_dimension(); + else + error_report("Errors reported -- run with -W sqlite-fix"); + } + + if (!errors_detected) { + if (check_table_integrity(NULL)) + error_report("Errors reported"); } } + + if (rebuild & DB_CHECK_RECLAIM_SPACE) { + if (!(rebuild & DB_CHECK_CONT)) + info("Reclaiming space of %s", sqlite_database); + rc = sqlite3_exec(db_meta, "VACUUM;", 0, 0, &err_msg); + if (rc != SQLITE_OK) { + error_report("Failed to execute VACUUM rc = %d (%s)", rc, err_msg); + sqlite3_free(err_msg); + } + } + + if (rebuild && !(rebuild & DB_CHECK_CONT)) + return 1; + + info("SQLite database %s initialization", sqlite_database); + + if (init_database_batch(rebuild, 0, &database_config[0])) + return 1; + + if (init_database_batch(rebuild, 0, &database_cleanup[0])) + return 1; + + fatal_assert(0 == uv_mutex_init(&sqlite_transaction_lock)); info("SQLite database initialization completed"); return 0; } diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h index 1428806c98..3e41f6aaa2 100644 --- a/database/sqlite/sqlite_functions.h +++ b/database/sqlite/sqlite_functions.h @@ -16,6 +16,13 @@ struct node_instance_list { int hops; }; +typedef enum db_check_action_type { + DB_CHECK_NONE = 0x0000, + DB_CHECK_INTEGRITY = 0x0001, + DB_CHECK_FIX_DB = 0x0002, + DB_CHECK_RECLAIM_SPACE = 0x0004, + DB_CHECK_CONT = 0x00008 +} db_check_action_type_t; #define SQLITE_INSERT_DELAY (50) // Insert delay in case of lock @@ -49,7 +56,7 @@ struct node_instance_list { return 1; \ } -extern int sql_init_database(void); +extern int sql_init_database(db_check_action_type_t rebuild); extern void sql_close_database(void); extern int sql_store_host(uuid_t *guid, const char *hostname, const char *registry_hostname, int update_every, const char *os, const char *timezone, const char *tags);