diff --git a/Makefile.am b/Makefile.am index 38fb9cdb9a..405fd862a3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -156,6 +156,8 @@ LIBNETDATA_FILES = \ libnetdata/eval/eval.h \ libnetdata/facets/facets.c \ libnetdata/facets/facets.h \ + libnetdata/functions_evloop/functions_evloop.c \ + libnetdata/functions_evloop/functions_evloop.h \ libnetdata/gorilla/gorilla.h \ libnetdata/gorilla/gorilla.cc \ libnetdata/inlined.h \ diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c index ade68286aa..54db4661b9 100644 --- a/collectors/apps.plugin/apps_plugin.c +++ b/collectors/apps.plugin/apps_plugin.c @@ -4677,7 +4677,7 @@ static void apps_plugin_function_processes_help(const char *transaction) { buffer_json_add_array_item_double(wb, _tmp); \ } while(0) -static void function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { +static void function_processes(const char *transaction, char *function __maybe_unused, int timeout __maybe_unused, bool *cancelled __maybe_unused) { struct pid_stat *p; char *words[PLUGINSD_MAX_WORDS] = { NULL }; @@ -5531,62 +5531,6 @@ static void function_processes(const char *transaction, char *function __maybe_u static bool apps_plugin_exit = false; -static void *reader_main(void *arg __maybe_unused) { - char buffer[PLUGINSD_LINE_MAX + 1]; - - char *s = NULL; - while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; - -// internal_error(true, "Received function '%s', transaction '%s', timeout %d", function, transaction, timeout); - - netdata_mutex_lock(&apps_and_stdout_mutex); - - if(strncmp(function, "processes", strlen("processes")) == 0) - function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else { - pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, - "No function with this name found in apps.plugin."); - } - - netdata_mutex_unlock(&apps_and_stdout_mutex); - -// internal_error(true, "Done with function '%s', transaction '%s', timeout %d", function, transaction, timeout); - } - } - else - netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); - } - - if(!s || feof(stdin) || ferror(stdin)) { - apps_plugin_exit = true; - netdata_log_error("Received error on stdin."); - } - - exit(1); - return NULL; -} - int main(int argc, char **argv) { // debug_flags = D_PROCFILE; stderror = stderr; @@ -5690,10 +5634,17 @@ int main(int argc, char **argv) { all_pids = callocz(sizeof(struct pid_stat *), (size_t) pid_max + 1); - netdata_thread_t reader_thread; - netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL); - netdata_mutex_lock(&apps_and_stdout_mutex); + // ------------------------------------------------------------------------ + // the event loop for functions + struct functions_evloop_globals *wg = + functions_evloop_init(1, "APPS", &apps_and_stdout_mutex, &apps_plugin_exit); + + functions_evloop_add_function(wg, "processes", function_processes, PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT); + + // ------------------------------------------------------------------------ + + netdata_mutex_lock(&apps_and_stdout_mutex); APPS_PLUGIN_GLOBAL_FUNCTIONS(); usec_t step = update_every * USEC_PER_SEC; @@ -5717,12 +5668,10 @@ int main(int argc, char **argv) { struct pollfd pollfd = { .fd = fileno(stdout), .events = POLLERR }; if (unlikely(poll(&pollfd, 1, 0) < 0)) { netdata_mutex_unlock(&apps_and_stdout_mutex); - netdata_thread_cancel(reader_thread); fatal("Cannot check if a pipe is available"); } if (unlikely(pollfd.revents & POLLERR)) { netdata_mutex_unlock(&apps_and_stdout_mutex); - netdata_thread_cancel(reader_thread); fatal("Received error on read pipe."); } @@ -5733,7 +5682,6 @@ int main(int argc, char **argv) { netdata_log_error("Cannot collect /proc data for running processes. Disabling apps.plugin..."); printf("DISABLE\n"); netdata_mutex_unlock(&apps_and_stdout_mutex); - netdata_thread_cancel(reader_thread); exit(1); } diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 594fa70170..b22b037ba6 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -10,48 +10,6 @@ #define PLUGINSD_CMD_MAX (FILENAME_MAX*2) #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 -#define PLUGINSD_KEYWORD_CHART "CHART" -#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" -#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" -#define PLUGINSD_KEYWORD_BEGIN "BEGIN" -#define PLUGINSD_KEYWORD_SET "SET" -#define PLUGINSD_KEYWORD_END "END" -#define PLUGINSD_KEYWORD_FLUSH "FLUSH" -#define PLUGINSD_KEYWORD_DISABLE "DISABLE" -#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" -#define PLUGINSD_KEYWORD_LABEL "LABEL" -#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" -#define PLUGINSD_KEYWORD_CLABEL "CLABEL" -#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" -#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" - -#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" -#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" -#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" -#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" -#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" -#define PLUGINSD_KEYWORD_REPLAY_END "REND" - -#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" -#define PLUGINSD_KEYWORD_SET_V2 "SET2" -#define PLUGINSD_KEYWORD_END_V2 "END2" - -#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" -#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" -#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" -#define PLUGINSD_KEYWORD_HOST "HOST" - -#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" -#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" - -#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" - -#define PLUGINSD_KEYWORD_EXIT "EXIT" - -#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds - #define PLUGINSD_LINE_MAX_SSL_READ 512 #define PLUGINSD_MAX_WORDS 20 @@ -99,45 +57,4 @@ void pluginsd_process_thread_cleanup(void *ptr); size_t pluginsd_initialize_plugin_directories(); -#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ - buffer_sprintf(wb \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_buffer(wb) \ - buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ - fprintf(stdout \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_stdout() \ - fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) { - char buffer[PLUGINSD_LINE_MAX + 1]; - json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); - - pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); - fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); -} - -static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { - pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); - fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); -} - #endif /* NETDATA_PLUGINS_D_H */ diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 0a41308974..bff73f0b37 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -733,14 +733,15 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE struct inflight_function { int code; int timeout; - BUFFER *destination_wb; STRING *function; - void (*callback)(BUFFER *wb, int code, void *callback_data); - void *callback_data; + BUFFER *result_body_wb; + rrd_function_result_callback_t result_cb; + void *result_cb_data; usec_t timeout_ut; usec_t started_ut; usec_t sent_ut; const char *payload; + PARSER *parser; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -751,10 +752,12 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + const char *transaction = dictionary_acquired_item_name(item); + char buffer[2048 + 1]; snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", - dictionary_acquired_item_name(item), + transaction, pf->timeout, string2str(pf->function)); @@ -765,7 +768,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void if(ret < 0) { netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, @@ -782,7 +785,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void if(ret < 0) { netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret); - rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); + rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE); } else { internal_error(LOG_FUNCTIONS, @@ -798,8 +801,8 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m struct inflight_function *pf = new_func; netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); - pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); + pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); string_freez(pf->function); return false; @@ -811,9 +814,9 @@ static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __may internal_error(LOG_FUNCTIONS, "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)", string2str(pf->function), dictionary_acquired_item_name(item), - buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); - pf->callback(pf->destination_wb, pf->code, pf->callback_data); + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); string_freez(pf->function); } @@ -833,8 +836,8 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { "FUNCTION '%s' removing expired transaction '%s', after %llu usec.", string2str(pf->function), pf_dfe.name, now - pf->started_ut); - if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK) - pf->code = rrd_call_function_error(pf->destination_wb, + if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK) + pf->code = rrd_call_function_error(pf->result_body_wb, "Timeout waiting for collector response.", HTTP_RESP_GATEWAY_TIMEOUT); @@ -847,35 +850,73 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { dfe_done(pf); } +void pluginsd_function_cancel(void *data) { + struct inflight_function *look_for = data, *t; + + bool sent = false; + dfe_start_read(look_for->parser->inflight.functions, t) { + if(look_for == t) { + const char *transaction = t_dfe.name; + + internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "%s %s\n", + PLUGINSD_KEYWORD_FUNCTION_CANCEL, + transaction); + + // send the command to the plugin + ssize_t ret = send_to_plugin(buffer, t->parser); + if(ret < 0) + sent = true; + + break; + } + } + dfe_done(t); + + if(sent <= 0) + netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); +} + // this is the function that is called from // rrd_call_function_and_wait() and rrd_call_function_async() -static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) { - PARSER *parser = collector_data; +static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function, + void *execute_cb_data, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused, + void *is_cancelled_cb_data __maybe_unused, + rrd_function_register_canceller_cb_t register_canceller_cb, + void *register_canceller_db_data) { + PARSER *parser = execute_cb_data; usec_t now = now_realtime_usec(); struct inflight_function tmp = { .started_ut = now, .timeout_ut = now + timeout * USEC_PER_SEC, - .destination_wb = destination_wb, + .result_body_wb = result_body_wb, .timeout = timeout, .function = string_strdupz(function), - .callback = callback, - .callback_data = callback_data, - .payload = NULL + .result_cb = result_cb, + .result_cb_data = result_cb_data, + .payload = NULL, + .parser = parser, }; uuid_t uuid; - uuid_generate_time(uuid); + uuid_generate_random(uuid); - char key[UUID_STR_LEN]; - uuid_unparse_lower(uuid, key); + char transaction[UUID_STR_LEN]; + uuid_unparse_lower(uuid, transaction); dictionary_write_lock(parser->inflight.functions); // if there is any error, our dictionary callbacks will call the caller callback to notify // the caller about the error - no need for error handling here. - dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function)); + if(register_canceller_cb) + register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t); if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) parser->inflight.smaller_timeout = tmp.timeout_ut; @@ -890,6 +931,8 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou } static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { + // a plugin or a child is registering a function + bool global = false; size_t i = 1; if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { @@ -926,7 +969,7 @@ static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } - rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); + rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser); parser->user.data_collections_count++; @@ -973,18 +1016,18 @@ static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_ } else { if(format && *format) - pf->destination_wb->content_type = functions_format_to_content_type(format); + pf->result_body_wb->content_type = functions_format_to_content_type(format); pf->code = code; - pf->destination_wb->expires = expiration; + pf->result_body_wb->expires = expiration; if(expiration <= now_realtime_sec()) - buffer_no_cacheable(pf->destination_wb); + buffer_no_cacheable(pf->result_body_wb); else - buffer_cacheable(pf->destination_wb); + buffer_cacheable(pf->result_body_wb); } - parser->defer.response = (pf) ? pf->destination_wb : NULL; + parser->defer.response = (pf) ? pf->result_body_wb : NULL; parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; parser->defer.action = pluginsd_function_result_end; parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL @@ -1916,11 +1959,11 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, struct inflight_function tmp = { .started_ut = now, .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, - .destination_wb = wb, + .result_body_wb = wb, .timeout = VIRT_FNC_TIMEOUT, .function = string_strdupz(name), - .callback = virt_fnc_got_data_cb, - .callback_data = &cond, + .result_cb = virt_fnc_got_data_cb, + .result_cb_data = &cond, .payload = payload, }; diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 5a247a0a72..6efbb57550 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -5,8 +5,6 @@ * GPL v3+ */ -// TODO - 1) MARKDOC 2) HELP TEXT - #include "collectors/all.h" #include "libnetdata/libnetdata.h" #include "libnetdata/required_dummies.h" @@ -23,6 +21,7 @@ #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 #define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50 +#define SYSTEMD_JOURNAL_WORKER_THREADS 2 #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -58,9 +57,6 @@ static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; static bool plugin_should_exit = false; -DICTIONARY *uids = NULL; -DICTIONARY *gids = NULL; - // ---------------------------------------------------------------------------- static inline sd_journal *netdata_open_systemd_journal(void) { @@ -95,6 +91,7 @@ typedef enum { ND_SD_JOURNAL_TIMED_OUT, ND_SD_JOURNAL_OK, ND_SD_JOURNAL_NOT_MODIFIED, + ND_SD_JOURNAL_CANCELLED, } ND_SD_JOURNAL_STATUS; static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) { @@ -130,21 +127,42 @@ static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *fa } } -ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) { +static inline ND_SD_JOURNAL_STATUS check_stop(size_t row_counter, const bool *cancelled, usec_t stop_monotonic_ut) { + if((row_counter % 1000) == 0) { + if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) { + internal_error(true, "Function has been cancelled"); + return ND_SD_JOURNAL_CANCELLED; + } + + if(now_monotonic_usec() > stop_monotonic_ut) { + internal_error(true, "Function timed out"); + return ND_SD_JOURNAL_TIMED_OUT; + } + } + + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + usec_t after_ut, usec_t before_ut, + usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified, + bool *cancelled) { if(!netdata_systemd_journal_seek_to(j, before_ut)) return ND_SD_JOURNAL_FAILED_TO_SEEK; size_t errors_no_timestamp = 0; usec_t first_msg_ut = 0; - bool timed_out = false; size_t row_counter = 0; // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond // the end of the query to find possibly useful logs for our time-frame size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; + facets_rows_begin(facets); - while (sd_journal_previous(j) > 0) { + while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) { row_counter++; usec_t msg_ut; @@ -174,10 +192,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w netdata_systemd_journal_process_row(j, facets); facets_row_finished(facets, msg_ut); - if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { - timed_out = true; - break; - } + status = check_stop(row_counter, cancelled, stop_monotonic_ut); } if(errors_no_timestamp) @@ -185,18 +200,19 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w *last_modified = first_msg_ut; - if(timed_out) - return ND_SD_JOURNAL_TIMED_OUT; - - return ND_SD_JOURNAL_OK; + return status; } -ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + usec_t after_ut, usec_t before_ut, + usec_t anchor, size_t entries, usec_t stop_monotonic_ut, + bool *cancelled) { + if(!netdata_systemd_journal_seek_to(j, anchor)) return ND_SD_JOURNAL_FAILED_TO_SEEK; size_t errors_no_timestamp = 0; - bool timed_out = false; size_t row_counter = 0; size_t rows_added = 0; @@ -204,8 +220,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B // the end of the query to find possibly useful logs for our time-frame size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; + facets_rows_begin(facets); - while (sd_journal_next(j) > 0) { + while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) { row_counter++; usec_t msg_ut; @@ -231,27 +249,25 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B facets_row_finished(facets, msg_ut); rows_added++; - if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { - timed_out = true; - break; - } + status = check_stop(row_counter, cancelled, stop_monotonic_ut); } if(errors_no_timestamp) netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); - if(timed_out) - return ND_SD_JOURNAL_TIMED_OUT; - - return ND_SD_JOURNAL_OK; + return status; } -ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + usec_t after_ut, usec_t before_ut, + usec_t anchor, size_t entries, usec_t stop_monotonic_ut, + bool *cancelled) { + if(!netdata_systemd_journal_seek_to(j, anchor)) return ND_SD_JOURNAL_FAILED_TO_SEEK; size_t errors_no_timestamp = 0; - bool timed_out = false; size_t row_counter = 0; size_t rows_added = 0; @@ -259,8 +275,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, // the end of the query to find possibly useful logs for our time-frame size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; + facets_rows_begin(facets); - while (sd_journal_previous(j) > 0) { + while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) { row_counter++; usec_t msg_ut; @@ -286,19 +304,13 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, facets_row_finished(facets, msg_ut); rows_added++; - if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { - timed_out = true; - break; - } + status = check_stop(row_counter, cancelled, stop_monotonic_ut); } if(errors_no_timestamp) netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); - if(timed_out) - return ND_SD_JOURNAL_TIMED_OUT; - - return ND_SD_JOURNAL_OK; + return status; } bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) { @@ -327,7 +339,8 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries, usec_t if_modified_since, bool data_only, - usec_t stop_monotonic_ut) { + usec_t stop_monotonic_ut, + bool *cancelled) { sd_journal *j = netdata_open_systemd_journal(); if(!j) return HTTP_RESP_INTERNAL_SERVER_ERROR; @@ -341,23 +354,36 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, // we can do a data-only query if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) - status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled); else - status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled); } else { // we have to do a full query status = netdata_systemd_journal_query_full(j, wb, facets, after_ut, before_ut, if_modified_since, - stop_monotonic_ut, &last_modified); + stop_monotonic_ut, &last_modified, cancelled); } sd_journal_close(j); - if(status == ND_SD_JOURNAL_NOT_MODIFIED) - return HTTP_RESP_NOT_MODIFIED; + if(status != ND_SD_JOURNAL_OK && status != ND_SD_JOURNAL_TIMED_OUT) { + buffer_flush(wb); - buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK); + switch (status) { + case ND_SD_JOURNAL_CANCELLED: + return HTTP_RESP_CLIENT_CLOSED_REQUEST; + + case ND_SD_JOURNAL_NOT_MODIFIED: + return HTTP_RESP_NOT_MODIFIED; + + default: + case ND_SD_JOURNAL_FAILED_TO_SEEK: + return HTTP_RESP_INTERNAL_SERVER_ERROR; + } + } + + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK); buffer_json_member_add_string(wb, "type", "table"); @@ -372,7 +398,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (data_only ? 3600 : 0)); buffer_json_finalize(wb); - return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK; + return HTTP_RESP_OK; } static void netdata_systemd_journal_function_help(const char *transaction) { @@ -484,8 +510,8 @@ static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) { } static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { + static __thread char tmp[1024 + 1]; struct passwd pw, *result; - char tmp[1024 + 1]; if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL) return NULL; @@ -495,8 +521,8 @@ static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { } static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { + static __thread char tmp[1024 + 1]; struct group grp, *result; - char tmp[1024 + 1]; if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL) return NULL; @@ -531,46 +557,106 @@ static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_un } } -static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +// ---------------------------------------------------------------------------- +// UID and GID transformation + +#define UID_GID_HASHTABLE_SIZE 1000 + +struct word_t2str_hashtable_entry { + struct word_t2str_hashtable_entry *next; + Word_t hash; + size_t len; + char str[]; +}; + +struct word_t2str_hashtable { + SPINLOCK spinlock; + size_t size; + struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE]; +}; + +struct word_t2str_hashtable uid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable gid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) { + size_t slot = hash % ht->size; + struct word_t2str_hashtable_entry **e = &ht->hashtable[slot]; + + while(*e && (*e)->hash != hash) + e = &((*e)->next); + + return e; +} + +const char *uid_to_username_cached(uid_t uid, size_t *length) { + spinlock_lock(&uid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid); + if(!(*e)) { + static __thread char buf[1024 + 1]; + const char *name = uid_to_username(uid, buf, 1024); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = uid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&uid_hashtable.spinlock); + + *length = (*e)->len; + return (*e)->str; +} + +const char *gid_to_groupname_cached(gid_t gid, size_t *length) { + spinlock_lock(&gid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid); + if(!(*e)) { + static __thread char buf[1024 + 1]; + const char *name = gid_to_groupname(gid, buf, 1024); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = gid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&gid_hashtable.spinlock); + + *length = (*e)->len; + return (*e)->str; +} + +static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int uid = str2i(buffer_tostring(wb)); - const char *name = uid_to_username(uid, buf, 1024); - if (!name) - name = v; - - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); - } - - buffer_flush(wb); - buffer_strcat(wb, sv); + uid_t uid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = uid_to_username_cached(uid, &len); + buffer_contents_replace(wb, name, len); } } -static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int gid = str2i(buffer_tostring(wb)); - const char *name = gid_to_groupname(gid, buf, 1024); - if (!name) - name = v; - - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); - } - - buffer_flush(wb); - buffer_strcat(wb, sv); + gid_t gid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = gid_to_groupname_cached(gid, &len); + buffer_contents_replace(wb, name, len); } } +// ---------------------------------------------------------------------------- + static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; @@ -593,47 +679,13 @@ static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); } -static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { +static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) { buffer_json_add_array_item_object(json_array); buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); buffer_json_object_close(json_array); } -static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { - static struct { - BUFFER *tmp; - BUFFER *wb; - BUFFER *function; - int response; - time_t expires; - } cache = { - .tmp = NULL, - .wb = NULL, - .function = NULL, - .response = 0, - .expires = 0, - }; - - if(unlikely(!cache.wb)) { - cache.tmp = buffer_create(0, NULL); - cache.wb = buffer_create(0, NULL); - cache.function = buffer_create(0, NULL); - } - - if(buffer_strlen(cache.function) && buffer_strlen(cache.wb) && strcmp(buffer_tostring(cache.function), function) == 0) { - // repeated the same request - netdata_mutex_lock(&stdout_mutex); - if(cache.response == HTTP_RESP_OK) - pluginsd_function_result_to_stdout(transaction, cache.response, "application/json", cache.expires, cache.wb); - else - pluginsd_function_json_error_to_stdout(transaction, cache.response, "failed"); - netdata_mutex_unlock(&stdout_mutex); - return; - } - - buffer_flush(cache.tmp); - buffer_strcat(cache.tmp, function); - +static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS); @@ -682,10 +734,10 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - netdata_systemd_journal_transform_uid, uids); + netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - netdata_systemd_journal_transform_gid, gids); + netdata_systemd_journal_transform_gid, NULL); bool info = false; bool data_only = false; @@ -858,12 +910,19 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_set_items(facets, last); facets_set_anchor(facets, anchor, direction); facets_set_query(facets, query); - facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC); + + if(chart && *chart) + facets_set_histogram_by_id(facets, chart, + after_s * USEC_PER_SEC, before_s * USEC_PER_SEC); + else + facets_set_histogram_by_name(facets, "PRIORITY", + after_s * USEC_PER_SEC, before_s * USEC_PER_SEC); response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, anchor, direction, last, if_modified_since, data_only, - now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); + now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC, + cancelled); if(response != HTTP_RESP_OK) { netdata_mutex_lock(&stdout_mutex); @@ -872,14 +931,6 @@ static void function_systemd_journal(const char *transaction, char *function, ch goto cleanup; } - // keep this response in the cache - cache.response = response; - cache.expires = expires; - buffer_flush(cache.wb); - buffer_fast_strcat(cache.wb, buffer_tostring(wb), buffer_strlen(wb)); - buffer_flush(cache.function); - buffer_fast_strcat(cache.function, buffer_tostring(cache.tmp), buffer_strlen(cache.tmp)); - output: netdata_mutex_lock(&stdout_mutex); pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb); @@ -890,54 +941,7 @@ cleanup: buffer_free(wb); } -static void *reader_main(void *arg __maybe_unused) { - char buffer[PLUGINSD_LINE_MAX + 1]; - - char *s = NULL; - while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT; - - if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0) - function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else { - netdata_mutex_lock(&stdout_mutex); - pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, - "No function with this name found in systemd-journal.plugin."); - netdata_mutex_unlock(&stdout_mutex); - } - } - } - else - netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); - } - - if(!s || feof(stdin) || ferror(stdin)) { - plugin_should_exit = true; - netdata_log_error("Received error on stdin."); - } - - exit(1); -} +// ---------------------------------------------------------------------------- int main(int argc __maybe_unused, char **argv __maybe_unused) { stderror = stderr; @@ -952,9 +956,6 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { error_log_errors_per_period = 100; error_log_throttle_period = 3600; - uids = dictionary_create(0); - gids = dictionary_create(0); - netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); if(verify_netdata_host_prefix() == -1) exit(1); @@ -962,22 +963,28 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { // debug if(argc == 2 && strcmp(argv[1], "debug") == 0) { - char buf[] = "systemd-journal after:-864000 before:0 last:500"; + bool cancelled = false; + char buf[] = "systemd-journal after:-2592000 before:0 last:500"; // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403"; - function_systemd_journal("123", buf, "", 0, 30); + function_systemd_journal("123", buf, 30, &cancelled); exit(1); } // ------------------------------------------------------------------------ + // the event loop for functions + + struct functions_evloop_globals *wg = + functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit); + + functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal, + SYSTEMD_JOURNAL_DEFAULT_TIMEOUT); - netdata_thread_t reader_thread; - netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL); // ------------------------------------------------------------------------ time_t started_t = now_monotonic_sec(); - size_t iteration; + size_t iteration = 0; usec_t step = 1000 * USEC_PER_MS; bool tty = isatty(fileno(stderr)) == 1; @@ -987,7 +994,9 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { heartbeat_t hb; heartbeat_init(&hb); - for(iteration = 0; 1 ; iteration++) { + while(!plugin_should_exit) { + iteration++; + netdata_mutex_unlock(&stdout_mutex); heartbeat_next(&hb, step); netdata_mutex_lock(&stdout_mutex); @@ -1002,8 +1011,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { break; } - dictionary_destroy(uids); - dictionary_destroy(gids); - exit(0); } diff --git a/configure.ac b/configure.ac index 3692abec44..f75dfa2ac9 100644 --- a/configure.ac +++ b/configure.ac @@ -2023,6 +2023,7 @@ AC_CONFIG_FILES([ libnetdata/ebpf/Makefile libnetdata/eval/Makefile libnetdata/facets/Makefile + libnetdata/functions_evloop/Makefile libnetdata/july/Makefile libnetdata/locks/Makefile libnetdata/log/Makefile diff --git a/daemon/main.c b/daemon/main.c index e12a8b4b00..cb89ff4711 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1904,6 +1904,8 @@ int main(int argc, char **argv) { replication_initialize(); + rrd_functions_inflight_init(); + // -------------------------------------------------------------------- // get the certificate and start security diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c index 81a911c489..11a0c46b63 100644 --- a/database/rrdfunctions.c +++ b/database/rrdfunctions.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "rrd.h" @@ -277,16 +278,15 @@ typedef enum __attribute__((packed)) { // this is 8-bit } RRD_FUNCTION_OPTIONS; -struct rrd_collector_function { +struct rrd_host_function { bool sync; // when true, the function is called synchronously RRD_FUNCTION_OPTIONS options; // RRD_FUNCTION_OPTIONS STRING *help; int timeout; // the default timeout of the function - int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data, - function_data_ready_callback callback, void *callback_data); + rrd_function_execute_cb_t execute_cb; - void *collector_data; + void *execute_cb_data; struct rrd_collector *collector; }; @@ -299,6 +299,7 @@ struct rrd_collector_function { struct rrd_collector { int32_t refcount; + int32_t refcount_canceller; pid_t tid; bool running; }; @@ -330,7 +331,7 @@ void rrd_collector_started(void) { thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); thread_rrd_collector->tid = gettid(); - thread_rrd_collector->running = true; + __atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED); } // called once per collector @@ -338,17 +339,26 @@ void rrd_collector_finished(void) { if(!thread_rrd_collector) return; - thread_rrd_collector->running = false; + __atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED); + + int32_t expected = 0; + while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_canceller, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { + expected = 0; + sleep_usec(1 * USEC_PER_MS); + } + rrd_collector_free(thread_rrd_collector); thread_rrd_collector = NULL; } +#define rrd_collector_running(c) __atomic_load_n(&(c)->running, __ATOMIC_RELAXED) + static struct rrd_collector *rrd_collector_acquire(void) { rrd_collector_started(); int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0; do { - if(expected < 0 || !thread_rrd_collector->running) { + if(expected < 0 || !rrd_collector_running(thread_rrd_collector)) { internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting."); return thread_rrd_collector; } @@ -385,7 +395,7 @@ static void rrd_collector_release(struct rrd_collector *rdc) { static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) { RRDHOST *host = rrdhost; (void)host; - struct rrd_collector_function *rdcf = func; + struct rrd_host_function *rdcf = func; rrd_collector_started(); rdcf->collector = rrd_collector_acquire(); @@ -397,33 +407,42 @@ static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_un static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost __maybe_unused) { - struct rrd_collector_function *rdcf = func; + struct rrd_host_function *rdcf = func; rrd_collector_release(rdcf->collector); } static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *new_func, void *rrdhost) { RRDHOST *host = rrdhost; (void)host; - struct rrd_collector_function *rdcf = func; - struct rrd_collector_function *new_rdcf = new_func; + struct rrd_host_function *rdcf = func; + struct rrd_host_function *new_rdcf = new_func; rrd_collector_started(); bool changed = false; if(rdcf->collector != thread_rrd_collector) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed collector from %d to %d", + dictionary_acquired_item_name(item), rrdhost_hostname(host), rdcf->collector->tid, thread_rrd_collector->tid); + struct rrd_collector *old_rdc = rdcf->collector; rdcf->collector = rrd_collector_acquire(); rrd_collector_release(old_rdc); changed = true; } - if(rdcf->function != new_rdcf->function) { - rdcf->function = new_rdcf->function; + if(rdcf->execute_cb != new_rdcf->execute_cb) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + + rdcf->execute_cb = new_rdcf->execute_cb; changed = true; } if(rdcf->help != new_rdcf->help) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed help text", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + STRING *old = rdcf->help; rdcf->help = new_rdcf->help; string_freez(old); @@ -433,17 +452,26 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_ string_freez(new_rdcf->help); if(rdcf->timeout != new_rdcf->timeout) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed timeout", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + rdcf->timeout = new_rdcf->timeout; changed = true; } if(rdcf->sync != new_rdcf->sync) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed sync/async mode", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + rdcf->sync = new_rdcf->sync; changed = true; } - if(rdcf->collector_data != new_rdcf->collector_data) { - rdcf->collector_data = new_rdcf->collector_data; + if(rdcf->execute_cb_data != new_rdcf->execute_cb_data) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback data", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + + rdcf->execute_cb_data = new_rdcf->execute_cb_data; changed = true; } @@ -454,24 +482,23 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_ return changed; } - -void rrdfunctions_init(RRDHOST *host) { +void rrdfunctions_host_init(RRDHOST *host) { if(host->functions) return; host->functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - &dictionary_stats_category_functions, sizeof(struct rrd_collector_function)); + &dictionary_stats_category_functions, sizeof(struct rrd_host_function)); dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host); dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host); dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host); } -void rrdfunctions_destroy(RRDHOST *host) { +void rrdfunctions_host_destroy(RRDHOST *host) { dictionary_destroy(host->functions); } -void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, - bool sync, function_execute_at_collector function, void *collector_data) { +void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, + bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data) { // RRDSET *st may be NULL in this function // to create a GLOBAL function @@ -482,12 +509,12 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int char key[PLUGINSD_LINE_MAX + 1]; sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - struct rrd_collector_function tmp = { + struct rrd_host_function tmp = { .sync = sync, .timeout = timeout, .options = (st)?RRD_FUNCTION_LOCAL:RRD_FUNCTION_GLOBAL, - .function = function, - .collector_data = collector_data, + .execute_cb = execute_cb, + .execute_cb_data = execute_cb_data, .help = string_strdupz(help), }; const DICTIONARY_ITEM *item = dictionary_set_and_acquire_item(host->functions, key, &tmp, sizeof(tmp)); @@ -504,7 +531,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { if(!st->functions_view) return; - struct rrd_collector_function *tmp; + struct rrd_host_function *tmp; dfe_start_read(st->functions_view, tmp) { buffer_sprintf(wb , PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n" @@ -519,7 +546,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) { rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); - struct rrd_collector_function *tmp; + struct rrd_host_function *tmp; dfe_start_read(host->functions, tmp) { if(!(tmp->options & RRD_FUNCTION_GLOBAL)) continue; @@ -534,20 +561,6 @@ void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) { dfe_done(tmp); } -struct rrd_function_call_wait { - bool free_with_signal; - bool data_are_ready; - netdata_mutex_t mutex; - pthread_cond_t cond; - int code; -}; - -static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { - pthread_cond_destroy(&tmp->cond); - netdata_mutex_destroy(&tmp->mutex); - freez(tmp); -} - struct { const char *format; HTTP_CONTENT_TYPE content_type; @@ -596,17 +609,28 @@ int rrd_call_function_error(BUFFER *wb, const char *msg, int code) { return code; } -static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, struct rrd_collector_function **rdcf) { +static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, const DICTIONARY_ITEM **item) { char buffer[MAX_FUNCTION_LENGTH + 1]; strncpyz(buffer, name, MAX_FUNCTION_LENGTH); char *s = NULL; - *rdcf = NULL; + bool found = false; + *item = NULL; if(host->functions) { - while (!(*rdcf) && buffer[0]) { - *rdcf = dictionary_get(host->functions, buffer); - if (*rdcf) break; + while (buffer[0]) { + if((*item = dictionary_get_and_acquire_item(host->functions, buffer))) { + found = true; + + struct rrd_host_function *rdcf = dictionary_acquired_item_value(*item); + if(rrd_collector_running(rdcf->collector)) { + break; + } + else { + dictionary_acquired_item_release(host->functions, *item); + *item = NULL; + } + } // if s == NULL, set it to the end of the buffer // this should happen only the first time @@ -623,16 +647,131 @@ static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, s buffer_flush(wb); - if(!(*rdcf)) - return rrd_call_function_error(wb, "No collector is supplying this function on this host at this time.", HTTP_RESP_NOT_FOUND); - - if(!(*rdcf)->collector->running) - return rrd_call_function_error(wb, "The collector that registered this function, is not currently running.", HTTP_RESP_SERVICE_UNAVAILABLE); + if(!(*item)) { + if(found) + return rrd_call_function_error(wb, + "The collector that registered this function, is not currently running.", + HTTP_RESP_SERVICE_UNAVAILABLE); + else + return rrd_call_function_error(wb, + "No collector is supplying this function on this host at this time.", + HTTP_RESP_NOT_FOUND); + } return HTTP_RESP_OK; } -static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { +// ---------------------------------------------------------------------------- + +struct rrd_function_inflight { + bool used; + + RRDHOST *host; + const char *transaction; + const char *cmd; + const char *sanitized_cmd; + size_t sanitized_cmd_length; + int timeout; + bool cancelled; + + const DICTIONARY_ITEM *host_function_acquired; + + // the collector + // we acquire this structure at the beginning, + // and we release it at the end + struct rrd_host_function *rdcf; + + struct { + BUFFER *wb; + + // in async mode, + // the function to call to send the result back + rrd_function_result_callback_t cb; + void *data; + } result; + + struct { + // to be called in sync mode + // while the function is running + // to check if the function has been cancelled + rrd_function_is_cancelled_cb_t cb; + void *data; + } is_cancelled; + + struct { + // to be registered by the function itself + // used to signal the function to cancel + rrd_function_canceller_cb_t cb; + void *data; + } canceller; +}; + +static DICTIONARY *rrd_functions_inflight_requests = NULL; + +static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrd_function_inflight *r = value; + + // internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction); + + freez((void *)r->transaction); + freez((void *)r->cmd); + freez((void *)r->sanitized_cmd); + dictionary_acquired_item_release(r->host->functions, r->host_function_acquired); +} + +void rrd_functions_inflight_init(void) { + if(rrd_functions_inflight_requests) + return; + + rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight)); + + dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL); +} + +void rrd_functions_inflight_destroy(void) { + if(!rrd_functions_inflight_requests) + return; + + dictionary_destroy(rrd_functions_inflight_requests); + rrd_functions_inflight_requests = NULL; +} + +static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_canceller_cb_t canceller_cb, void *canceller_cb_data) { + struct rrd_function_inflight *r = register_canceller_cb_data; + r->canceller.cb = canceller_cb; + r->canceller.data = canceller_cb_data; +} + +// ---------------------------------------------------------------------------- +// waiting for async function completion + +struct rrd_function_call_wait { + RRDHOST *host; + const DICTIONARY_ITEM *host_function_acquired; + char *transaction; + + bool free_with_signal; + bool data_are_ready; + netdata_mutex_t mutex; + pthread_cond_t cond; + int code; +}; + +static void rrd_inflight_function_cleanup(RRDHOST *host, const DICTIONARY_ITEM *host_function_acquired, const char *transaction) { + dictionary_del(rrd_functions_inflight_requests, transaction); + dictionary_garbage_collect(rrd_functions_inflight_requests); +} + +static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { + rrd_inflight_function_cleanup(tmp->host, tmp->host_function_acquired, tmp->transaction); + freez(tmp->transaction); + + pthread_cond_destroy(&tmp->cond); + netdata_mutex_destroy(&tmp->mutex); + freez(tmp); +} + +static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { struct rrd_function_call_wait *tmp = callback_data; bool we_should_free = false; @@ -658,115 +797,300 @@ static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, } } -int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name) { - int code; +static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) { + struct rrd_function_inflight *r = data; - struct rrd_collector_function *rdcf = NULL; + if(r->result.cb) + r->result.cb(wb, code, r->result.data); - char key[PLUGINSD_LINE_MAX + 1]; - size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - code = rrd_call_function_find(host, wb, key, key_length, &rdcf); - if(code != HTTP_RESP_OK) - return code; - - if(timeout <= 0) - timeout = rdcf->timeout; - - struct timespec tp; - clock_gettime(CLOCK_REALTIME, &tp); - tp.tv_sec += (time_t)timeout; - - if(rdcf->sync) { - code = rdcf->function(wb, timeout, key, rdcf->collector_data, NULL, NULL); - } - else { - struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); - tmp->free_with_signal = false; - tmp->data_are_ready = false; - netdata_mutex_init(&tmp->mutex); - pthread_cond_init(&tmp->cond, NULL); - - bool we_should_free = true; - BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it - temp_wb->content_type = wb->content_type; - code = rdcf->function(temp_wb, timeout, key, rdcf->collector_data, rrd_call_function_signal_when_ready, tmp); - if (code == HTTP_RESP_OK) { - netdata_mutex_lock(&tmp->mutex); - - int rc = 0; - while (rc == 0 && !tmp->data_are_ready) { - // the mutex is unlocked within pthread_cond_timedwait() - rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); - // the mutex is again ours - } - - if (tmp->data_are_ready) { - // we have a response - buffer_fast_strcat(wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb)); - wb->content_type = temp_wb->content_type; - wb->expires = temp_wb->expires; - - if(wb->expires) - buffer_cacheable(wb); - else - buffer_no_cacheable(wb); - - code = tmp->code; - } - else if (rc == ETIMEDOUT) { - // timeout - // we will go away and let the callback free the structure - tmp->free_with_signal = true; - we_should_free = false; - code = rrd_call_function_error(wb, "Timeout while waiting for a response from the collector.", HTTP_RESP_GATEWAY_TIMEOUT); - } - else - code = rrd_call_function_error(wb, "Failed to get the response from the collector.", HTTP_RESP_INTERNAL_SERVER_ERROR); - - netdata_mutex_unlock(&tmp->mutex); - } - else { - if(!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to send request to the collector.", code); - } - - if (we_should_free) { - rrd_function_call_wait_free(tmp); - buffer_free(temp_wb); - } - } - - return code; + rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction); } -int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name, - rrd_call_function_async_callback callback, void *callback_data) { - int code; +static bool rrd_inflight_async_function_is_cancelled(void *data) { + struct rrd_function_inflight *r = data; + return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED); +} - struct rrd_collector_function *rdcf = NULL; - char key[PLUGINSD_LINE_MAX + 1]; - size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - code = rrd_call_function_find(host, wb, key, key_length, &rdcf); - if(code != HTTP_RESP_OK) - return code; - - if(timeout <= 0) - timeout = rdcf->timeout; - - code = rdcf->function(wb, timeout, key, rdcf->collector_data, callback, callback_data); +static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) { + int code = r->rdcf->execute_cb(r->result.wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data, + rrd_inflight_async_function_nowait_finished, r, + rrd_inflight_async_function_is_cancelled, r, + rrd_inflight_async_function_register_canceller_cb, r); if(code != HTTP_RESP_OK) { - if (!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to send request to the collector.", code); + if (!buffer_strlen(r->result.wb)) + rrd_call_function_error(r->result.wb, "Failed to send request to the collector.", code); + + rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction); } return code; } +static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) { + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + usec_t now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC; + usec_t end_ut = now_ut + r->timeout * USEC_PER_SEC; + + struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); + tmp->free_with_signal = false; + tmp->data_are_ready = false; + tmp->host = r->host; + tmp->host_function_acquired = r->host_function_acquired; + tmp->transaction = strdupz(r->transaction); + netdata_mutex_init(&tmp->mutex); + pthread_cond_init(&tmp->cond, NULL); + + // we need a temporary BUFFER, because we may time out and the caller supplied one may vanish + // so, we create a new one we guarantee will survive until the collector finishes... + + bool we_should_free = true; + BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it + temp_wb->content_type = r->result.wb->content_type; + + int code = r->rdcf->execute_cb(temp_wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data, + // we overwrite the result callbacks, + // so that we can clean up the allocations made + rrd_async_function_signal_when_ready, tmp, + rrd_inflight_async_function_is_cancelled, r, + rrd_inflight_async_function_register_canceller_cb, r); + + if (code == HTTP_RESP_OK) { + netdata_mutex_lock(&tmp->mutex); + + bool cancelled = false; + int rc = 0; + while (rc == 0 && !cancelled && !tmp->data_are_ready) { + clock_gettime(CLOCK_REALTIME, &tp); + now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC; + + if(now_ut >= end_ut) { + rc = ETIMEDOUT; + break; + } + + tp.tv_nsec += 10 * NSEC_PER_MSEC; + if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) { + tp.tv_sec++; + tp.tv_nsec -= 1 * NSEC_PER_SEC; + } + + // the mutex is unlocked within pthread_cond_timedwait() + rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); + // the mutex is again ours + + if(rc == ETIMEDOUT) { + rc = 0; + if (!tmp->data_are_ready && r->is_cancelled.cb && + r->is_cancelled.cb(r->is_cancelled.data)) { +// internal_error(true, "FUNCTIONS: transaction '%s' is cancelled while waiting for response", +// r->transaction); + rc = 0; + cancelled = true; + rrd_function_cancel(r->transaction); + break; + } + } + } + + if (tmp->data_are_ready) { + // we have a response + buffer_fast_strcat(r->result.wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb)); + r->result.wb->content_type = temp_wb->content_type; + r->result.wb->expires = temp_wb->expires; + + if(r->result.wb->expires) + buffer_cacheable(r->result.wb); + else + buffer_no_cacheable(r->result.wb); + + code = tmp->code; + } + else if (rc == ETIMEDOUT || cancelled) { + // timeout + // we will go away and let the callback free the structure + tmp->free_with_signal = true; + we_should_free = false; + + if(cancelled) + code = rrd_call_function_error(r->result.wb, + "Request cancelled", + HTTP_RESP_CLIENT_CLOSED_REQUEST); + else + code = rrd_call_function_error(r->result.wb, + "Timeout while waiting for a response from the collector.", + HTTP_RESP_GATEWAY_TIMEOUT); + } + else + code = rrd_call_function_error(r->result.wb, + "Internal error while communicating with the collector", + HTTP_RESP_INTERNAL_SERVER_ERROR); + + netdata_mutex_unlock(&tmp->mutex); + } + else { + if(!buffer_strlen(r->result.wb)) + rrd_call_function_error(r->result.wb, "The collector returned an error.", code); + } + + if (we_should_free) { + rrd_function_call_wait_free(tmp); + buffer_free(temp_wb); + } + + return code; +} + +static inline int rrd_call_function_async(struct rrd_function_inflight *r, bool wait) { + if(wait) + return rrd_call_function_async_and_wait(r); + else + return rrd_call_function_async_and_dont_wait(r); +} + +// ---------------------------------------------------------------------------- + +int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd, + bool wait, const char *transaction, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data) { + + int code; + char sanitized_cmd[PLUGINSD_LINE_MAX + 1]; + const DICTIONARY_ITEM *host_function_acquired = NULL; + + // ------------------------------------------------------------------------ + // find the function + + size_t sanitized_cmd_length = sanitize_function_text(sanitized_cmd, cmd, PLUGINSD_LINE_MAX); + code = rrd_call_function_find(host, result_wb, sanitized_cmd, sanitized_cmd_length, &host_function_acquired); + if(code != HTTP_RESP_OK) + return code; + + struct rrd_host_function *rdcf = dictionary_acquired_item_value(host_function_acquired); + + if(timeout <= 0) + timeout = rdcf->timeout; + + + // ------------------------------------------------------------------------ + // the function can only be executed in sync mode + + if(rdcf->sync) { + // the caller has to wait + + code = rdcf->execute_cb(result_wb, timeout, sanitized_cmd, rdcf->execute_cb_data, + NULL, NULL, // no callback needed, it is synchronous + is_cancelled_cb, is_cancelled_cb_data, // it is ok to pass these, we block the caller + NULL, NULL); // no need to pass, we will wait + + if (code != HTTP_RESP_OK && !buffer_strlen(result_wb)) + rrd_call_function_error(result_wb, "Collector reported error.", code); + + dictionary_acquired_item_release(host->functions, host_function_acquired); + return code; + } + + + // ------------------------------------------------------------------------ + // the function can only be executed in async mode + // put the function into the inflight requests + + char uuid_str[UUID_STR_LEN]; + if(!transaction) { + uuid_t uuid; + uuid_generate_random(uuid); + uuid_unparse_lower(uuid, uuid_str); + transaction = uuid_str; + } + + // put the request into the inflight requests + struct rrd_function_inflight t = { + .used = false, + .host = host, + .cmd = strdupz(cmd), + .sanitized_cmd = strdupz(sanitized_cmd), + .sanitized_cmd_length = sanitized_cmd_length, + .transaction = strdupz(transaction), + .timeout = timeout, + .cancelled = false, + .host_function_acquired = host_function_acquired, + .rdcf = rdcf, + .result = { + .wb = result_wb, + .cb = result_cb, + .data = result_cb_data, + }, + .is_cancelled = { + .cb = is_cancelled_cb, + .data = is_cancelled_cb_data, + } + }; + struct rrd_function_inflight *r = dictionary_set(rrd_functions_inflight_requests, transaction, &t, sizeof(t)); + if(r->used) { + netdata_log_info("FUNCTIONS: duplicate transaction '%s', function: '%s'", t.transaction, t.cmd); + code = rrd_call_function_error(result_wb, "duplicate transaction", HTTP_RESP_BAD_REQUEST); + freez((void *)t.transaction); + freez((void *)t.cmd); + freez((void *)t.sanitized_cmd); + dictionary_acquired_item_release(r->host->functions, t.host_function_acquired); + return code; + } + r->used = true; + // internal_error(true, "FUNCTIONS: transaction '%s' started", r->transaction); + + return rrd_call_function_async(r, wait); +} + +void rrd_function_cancel(const char *transaction) { + // internal_error(true, "FUNCTIONS: request to cancel transaction '%s'", transaction); + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(rrd_functions_inflight_requests, transaction); + if(!item) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the transaction is not running.", + transaction); + return; + } + + struct rrd_function_inflight *r = dictionary_acquired_item_value(item); + + bool cancelled = __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED); + if(cancelled) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but it is already cancelled.", + transaction); + goto cleanup; + } + + __atomic_store_n(&r->cancelled, true, __ATOMIC_RELAXED); + + int32_t expected = __atomic_load_n(&r->rdcf->collector->refcount_canceller, __ATOMIC_RELAXED); + int32_t wanted; + do { + if(expected < 0) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the collector is not running.", + transaction); + goto cleanup; + } + + wanted = expected + 1; + } while(!__atomic_compare_exchange_n(&r->rdcf->collector->refcount_canceller, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(r->canceller.cb) + r->canceller.cb(r->canceller.data); + + __atomic_sub_fetch(&r->rdcf->collector->refcount_canceller, 1, __ATOMIC_RELAXED); + +cleanup: + dictionary_acquired_item_release(rrd_functions_inflight_requests, item); +} + +// ---------------------------------------------------------------------------- + static void functions2json(DICTIONARY *functions, BUFFER *wb, const char *ident, const char *kq, const char *sq) { - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; if(t_dfe.counter) buffer_strcat(wb, ",\n"); @@ -799,9 +1123,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) { buffer_json_member_add_object(wb, "functions"); - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(host->functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; buffer_json_member_add_object(wb, t_dfe.name); buffer_json_member_add_string(wb, "help", string2str(t->help)); @@ -822,9 +1146,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) { void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) { if(!rrdset_functions_view || !dst) return; - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(rrdset_functions_view, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; dictionary_set(dst, t_dfe.name, value, value_size); } @@ -834,9 +1158,9 @@ void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help) { if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return; - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(host->functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; if(help) *help = t->help; @@ -846,10 +1170,14 @@ void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t dfe_done(t); } +// ---------------------------------------------------------------------------- int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused, void *collector_data __maybe_unused, - function_data_ready_callback callback __maybe_unused, void *callback_data __maybe_unused) { + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, + rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data) { + time_t now = now_realtime_sec(); buffer_flush(wb); @@ -1468,8 +1796,14 @@ int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const cha buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + 1); buffer_json_finalize(wb); - if(callback) - callback(wb, HTTP_RESP_OK, callback_data); + int response = HTTP_RESP_OK; + if(is_cancelled_cb && is_cancelled_cb(is_cancelled_cb_data)) { + buffer_flush(wb); + response = HTTP_RESP_CLIENT_CLOSED_REQUEST; + } - return HTTP_RESP_OK; + if(result_cb) + result_cb(wb, response, result_cb_data); + + return response; } diff --git a/database/rrdfunctions.h b/database/rrdfunctions.h index 71ad96507f..d43b47017b 100644 --- a/database/rrdfunctions.h +++ b/database/rrdfunctions.h @@ -1,26 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef NETDATA_RRDFUNCTIONS_H #define NETDATA_RRDFUNCTIONS_H 1 +// ---------------------------------------------------------------------------- + #include "rrd.h" -void rrdfunctions_init(RRDHOST *host); -void rrdfunctions_destroy(RRDHOST *host); +typedef void (*rrd_function_result_callback_t)(BUFFER *wb, int code, void *result_cb_data); +typedef bool (*rrd_function_is_cancelled_cb_t)(void *is_cancelled_cb_data); +typedef void (*rrd_function_canceller_cb_t)(void *data); +typedef void (*rrd_function_register_canceller_cb_t)(void *register_cancel_cb_data, rrd_function_canceller_cb_t cancel_cb, void *cancel_cb_data); +typedef int (*rrd_function_execute_cb_t)(BUFFER *wb, int timeout, const char *function, void *collector_data, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, + rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data); + +void rrd_functions_inflight_init(void); +void rrdfunctions_host_init(RRDHOST *host); +void rrdfunctions_host_destroy(RRDHOST *host); void rrd_collector_started(void); void rrd_collector_finished(void); -typedef void (*function_data_ready_callback)(BUFFER *wb, int code, void *callback_data); +// add a function, to be run from the collector +void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, + bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data); -typedef int (*function_execute_at_collector)(BUFFER *wb, int timeout, const char *function, void *collector_data, - function_data_ready_callback callback, void *callback_data); +// call a function, to be run from anywhere +int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd, + bool wait, const char *transaction, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data); -void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, - bool sync, function_execute_at_collector function, void *collector_data); - -int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name); - -typedef void (*rrd_call_function_async_callback)(BUFFER *wb, int code, void *callback_data); -int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name, rrd_call_function_async_callback, void *callback_data); +// cancel a running function, to be run from anywhere +void rrd_function_cancel(const char *transaction); void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb); void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb); @@ -35,7 +48,9 @@ const char *functions_content_type_to_format(HTTP_CONTENT_TYPE content_type); int rrd_call_function_error(BUFFER *wb, const char *msg, int code); int rrdhost_function_streaming(BUFFER *wb, int timeout, const char *function, void *collector_data, - function_data_ready_callback callback, void *callback_data); + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, + rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data); #define RRDFUNCTIONS_STREAMING_HELP "Streaming status for parents and children." diff --git a/database/rrdhost.c b/database/rrdhost.c index 2a5778bf15..5d54b90045 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -335,7 +335,7 @@ int is_legacy = 1; netdata_mutex_init(&host->receiver_lock); if (likely(!archived)) { - rrdfunctions_init(host); + rrdfunctions_host_init(host); host->rrdlabels = rrdlabels_create(); rrdhost_initialize_rrdpush_sender( host, rrdpush_enabled, rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching); @@ -665,7 +665,7 @@ static void rrdhost_update(RRDHOST *host if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { rrdhost_flag_clear(host, RRDHOST_FLAG_ARCHIVED); - rrdfunctions_init(host); + rrdfunctions_host_init(host); if(!host->rrdlabels) host->rrdlabels = rrdlabels_create(); @@ -1070,9 +1070,9 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt // we register this only on localhost // for the other nodes, the origin server should register it rrd_collector_started(); // this creates a collector that runs for as long as netdata runs - rrd_collector_add_function(localhost, NULL, "streaming", 10, - RRDFUNCTIONS_STREAMING_HELP, true, - rrdhost_function_streaming, NULL); + rrd_function_add(localhost, NULL, "streaming", 10, + RRDFUNCTIONS_STREAMING_HELP, true, + rrdhost_function_streaming, NULL); #endif if (likely(system_info)) { @@ -1160,9 +1160,11 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host) rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread cbuffer_free(host->sender->buffer); + #ifdef ENABLE_RRDPUSH_COMPRESSION rrdpush_compressor_destroy(&host->sender->compressor); #endif + replication_cleanup_sender(host->sender); __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(*host->sender), __ATOMIC_RELAXED); @@ -1266,7 +1268,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { freez(host->node_id); rrdfamily_index_destroy(host); - rrdfunctions_destroy(host); + rrdfunctions_host_destroy(host); rrdvariables_destroy(host->rrdvars); if (host == localhost) rrdvariables_destroy(health_rrdvars); diff --git a/libnetdata/Makefile.am b/libnetdata/Makefile.am index e85f4abe1b..f01a1d34f0 100644 --- a/libnetdata/Makefile.am +++ b/libnetdata/Makefile.am @@ -15,6 +15,7 @@ SUBDIRS = \ ebpf \ eval \ facets \ + functions_evloop \ json \ july \ health \ diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index 0ea6cb39cf..0e3de92473 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -249,6 +249,17 @@ static inline void buffer_strcat(BUFFER *wb, const char *txt) { buffer_overflow_check(wb); } +static inline void buffer_contents_replace(BUFFER *wb, const char *txt, size_t len) { + wb->len = 0; + buffer_need_bytes(wb, len + 1); + + memcpy(wb->buffer, txt, len); + wb->len = len; + wb->buffer[wb->len] = '\0'; + + buffer_overflow_check(wb); +} + static inline void buffer_strncat(BUFFER *wb, const char *txt, size_t len) { if(unlikely(!txt || !*txt)) return; diff --git a/libnetdata/facets/facets.c b/libnetdata/facets/facets.c index 8259e46e91..7db7779e3b 100644 --- a/libnetdata/facets/facets.c +++ b/libnetdata/facets/facets.c @@ -1,14 +1,43 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #include "facets.h" #define HISTOGRAM_COLUMNS 100 - -static void facets_row_free(FACETS *facets __maybe_unused, FACET_ROW *row); +#define FACETS_KEYS_HASHTABLE_ENTRIES 1000 +#define FACETS_VALUES_HASHTABLE_ENTRIES 20 // ---------------------------------------------------------------------------- -static inline void uint64_to_char(uint64_t num, char *out) { - static const char id_encoding_characters[64 + 1] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ.abcdefghijklmnopqrstuvwxyz_0123456789"; +static const char id_encoding_characters[64 + 1] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ.abcdefghijklmnopqrstuvwxyz_0123456789"; +static const uint8_t id_encoding_characters_reverse[256] = { + ['A'] = 0, ['B'] = 1, ['C'] = 2, ['D'] = 3, + ['E'] = 4, ['F'] = 5, ['G'] = 6, ['H'] = 7, + ['I'] = 8, ['J'] = 9, ['K'] = 10, ['L'] = 11, + ['M'] = 12, ['N'] = 13, ['O'] = 14, ['P'] = 15, + ['Q'] = 16, ['R'] = 17, ['S'] = 18, ['T'] = 19, + ['U'] = 20, ['V'] = 21, ['W'] = 22, ['X'] = 23, + ['Y'] = 24, ['Z'] = 25, ['.'] = 26, ['a'] = 27, + ['b'] = 28, ['c'] = 29, ['d'] = 30, ['e'] = 31, + ['f'] = 32, ['g'] = 33, ['h'] = 34, ['i'] = 35, + ['j'] = 36, ['k'] = 37, ['l'] = 38, ['m'] = 39, + ['n'] = 40, ['o'] = 41, ['p'] = 42, ['q'] = 43, + ['r'] = 44, ['s'] = 45, ['t'] = 46, ['u'] = 47, + ['v'] = 48, ['w'] = 49, ['x'] = 50, ['y'] = 51, + ['z'] = 52, ['_'] = 53, ['0'] = 54, ['1'] = 55, + ['2'] = 56, ['3'] = 57, ['4'] = 58, ['5'] = 59, + ['6'] = 60, ['7'] = 61, ['8'] = 62, ['9'] = 63 +}; +__attribute__((constructor)) void initialize_facets_id_encoding_characters_reverse(void) { + +} + +#define FACET_STRING_HASH_SIZE 12 +#define FACETS_HASH XXH64_hash_t +#define FACETS_HASH_FUNCTION(src, len) XXH3_64bits(src, len) +#define FACETS_HASH_ZERO (FACETS_HASH)0 + +static inline void facets_hash_to_str(FACETS_HASH num, char *out) { + out[11] = '\0'; out[10] = id_encoding_characters[num & 63]; num >>= 6; out[9] = id_encoding_characters[num & 63]; num >>= 6; out[8] = id_encoding_characters[num & 63]; num >>= 6; @@ -22,25 +51,54 @@ static inline void uint64_to_char(uint64_t num, char *out) { out[0] = id_encoding_characters[num & 63]; } -inline void facets_string_hash(const char *src, size_t len, char *out) { - XXH128_hash_t hash = XXH3_128bits(src, len); +static inline FACETS_HASH str_to_facets_hash(const char *str) { + FACETS_HASH num = 0; + int shifts = 6 * (FACET_STRING_HASH_SIZE - 2); - uint64_to_char(hash.high64, out); - uint64_to_char(hash.low64, &out[11]); // Starts right after the first 64-bit encoded string + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[0])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[1])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[2])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[3])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[4])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[5])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[6])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[7])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[8])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[9])])) << shifts; shifts -= 6; + num |= ((FACETS_HASH)(id_encoding_characters_reverse[(uint8_t)(str[10])])) << shifts; - out[FACET_STRING_HASH_SIZE - 1] = '\0'; + return num; } -static inline void facets_zero_hash(char *out) { - for(int i = 0; i < FACET_STRING_HASH_SIZE ;i++) - out[i] = '0'; +static const char *hash_to_static_string(FACETS_HASH hash) { + static __thread char hash_str[FACET_STRING_HASH_SIZE]; + facets_hash_to_str(hash, hash_str); + return hash_str; +} - out[FACET_STRING_HASH_SIZE - 1] = '\0'; +static inline bool is_valid_string_hash(const char *s) { + if(strlen(s) != FACET_STRING_HASH_SIZE - 1) { + netdata_log_error("The user supplied key '%s' does not have the right length for a facets hash.", s); + return false; + } + + uint8_t *t = (uint8_t *)s; + while(*t) { + if(id_encoding_characters_reverse[*t] == 0 && *t != id_encoding_characters[0]) { + netdata_log_error("The user supplied key '%s' contains invalid characters for a facets hash.", s); + return false; + } + + t++; + } + + return true; } // ---------------------------------------------------------------------------- typedef struct facet_value { + FACETS_HASH hash; const char *name; bool selected; @@ -51,15 +109,21 @@ typedef struct facet_value { uint32_t *histogram; uint32_t min, max, sum; + + struct { + struct facet_value *next; + } parent_hashtable; + + struct facet_value *prev, *next; } FACET_VALUE; + struct facet_key { FACETS *facets; + FACETS_HASH hash; const char *name; - DICTIONARY *values; - FACET_KEY_OPTIONS options; bool default_selected_for_values; // the default "selected" for all values in the dictionary @@ -69,14 +133,21 @@ struct facet_key { uint32_t key_values_selected_in_row; struct { - char hash[FACET_STRING_HASH_SIZE]; + bool enabled; + FACET_VALUE *hashtable[FACETS_VALUES_HASHTABLE_ENTRIES]; + FACET_VALUE *ll; + } values; + + struct { + FACETS_HASH hash; bool updated; bool empty; BUFFER *b; + FACET_VALUE *v; } current_value; struct { - FACET_VALUE *empty_value; + FACET_VALUE *v; } empty_value; uint32_t order; @@ -91,6 +162,10 @@ struct facet_key { void *data; } transform; + struct { + struct facet_key *next; + } parent_hashtable; + struct facet_key *prev, *next; }; @@ -111,8 +186,11 @@ struct facets { DICTIONARY *accepted_params; - FACET_KEY *keys_ll; - DICTIONARY *keys; + struct { + FACET_KEY *hashtable[FACETS_KEYS_HASHTABLE_ENTRIES]; + FACET_KEY *ll; + } keys; + FACET_ROW *base; // double linked list of the selected facets rows uint32_t items_to_return; @@ -125,6 +203,7 @@ struct facets { } current_row; struct { + FACETS_HASH hash; char *chart; bool enabled; uint32_t slots; @@ -175,6 +254,253 @@ struct facets { // ---------------------------------------------------------------------------- +static void facets_row_free(FACETS *facets __maybe_unused, FACET_ROW *row); +static inline void facet_value_is_used(FACET_KEY *k, FACET_VALUE *v); +static inline bool facets_key_is_facet(FACETS *facets, FACET_KEY *k); + +// ---------------------------------------------------------------------------- +// The FACET_VALUE index within each FACET_KEY + +#define foreach_value_in_key(k, v) \ + for((v) = (k)->values.ll; (v) ;(v) = (v)->next) + +#define foreach_value_in_key_done(v) do { ; } while(0) + +static inline void FACETS_VALUES_INDEX_CREATE(FACET_KEY *k) { + k->values.ll = NULL; +} + +static inline void FACETS_VALUES_INDEX_DESTROY(FACET_KEY *k) { + FACET_VALUE *v = k->values.ll; + while(v) { + FACET_VALUE *next = v->next; + freez(v->histogram); + freez((void *)v->name); + freez(v); + v = next; + } + k->values.ll = NULL; + memset(k->values.hashtable, 0, sizeof(k->values.hashtable)); + k->values.enabled = false; +} + +static inline void FACET_VALUE_ADD_CONFLICT(FACET_KEY *k, FACET_VALUE *v, const FACET_VALUE * const nv) { + if(!v->name && nv->name) + // an actual value, not a filter + v->name = strdupz(nv->name); + + if(v->name) + facet_value_is_used(k, v); + + internal_fatal(v->name && nv->name && strcmp(v->name, nv->name) != 0, + "value hash conflict: '%s' and '%s' have the same hash '%s'", + v->name, nv->name, hash_to_static_string(v->hash)); + + k->facets->operations.values.conflicts++; +} + +static inline FACET_VALUE **facets_values_hashtable_slot(FACET_KEY *k, FACETS_HASH hash) { + size_t slot = hash % FACETS_VALUES_HASHTABLE_ENTRIES; + FACET_VALUE **v = &k->values.hashtable[slot]; + + while(*v && (*v)->hash != hash) + v = &((*v)->parent_hashtable.next); + + return v; +} + +static inline FACET_VALUE *FACET_VALUE_ADD_TO_INDEX(FACET_KEY *k, const FACET_VALUE * const tv) { + FACET_VALUE **v_ptr = facets_values_hashtable_slot(k, tv->hash); + + if(*v_ptr) { + // already exists + + FACET_VALUE *v = *v_ptr; + FACET_VALUE_ADD_CONFLICT(k, v, tv); + return v; + } + + // we have to add it + + FACET_VALUE *v = mallocz(sizeof(*v)); + *v_ptr = v; + + memcpy(v, tv, sizeof(*v)); + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(k->values.ll, v, prev, next); + + if(!v->selected) + v->selected = k->default_selected_for_values; + + if(v->name) { + // an actual value, not a filter + v->name = strdupz(v->name); + facet_value_is_used(k, v); + } + + k->facets->operations.values.inserts++; + + return v; +} + +static inline void FACET_VALUE_ADD_EMPTY_VALUE_TO_INDEX(FACET_KEY *k) { + static const FACET_VALUE tv = { + .hash = FACETS_HASH_ZERO, + .name = FACET_VALUE_UNSET, + }; + + k->current_value.hash = FACETS_HASH_ZERO; + + if(k->empty_value.v) { + FACET_VALUE_ADD_CONFLICT(k, k->empty_value.v, &tv); + k->current_value.v = k->empty_value.v; + } + else { + FACET_VALUE *v = FACET_VALUE_ADD_TO_INDEX(k, &tv); + v->empty = true; + k->empty_value.v = v; + k->current_value.v = v; + } +} + +static inline void FACET_VALUE_ADD_CURRENT_VALUE_TO_INDEX(FACET_KEY *k) { + static __thread FACET_VALUE tv = { 0 }; + + tv.hash = FACETS_HASH_FUNCTION(buffer_tostring(k->current_value.b), buffer_strlen(k->current_value.b)); + tv.name = buffer_tostring(k->current_value.b); + + k->current_value.v = FACET_VALUE_ADD_TO_INDEX(k, &tv); + k->facets->operations.values.indexed++; +} + +static inline void FACET_VALUE_ADD_OR_UPDATE_SELECTED(FACET_KEY *k, FACETS_HASH hash) { + FACET_VALUE tv = { + .hash = hash, + .selected = true, + }; + FACET_VALUE_ADD_TO_INDEX(k, &tv); +} + +static inline FACET_VALUE *FACET_VALUE_GET_CURRENT_VALUE(FACET_KEY *k) { + if(unlikely(!k->current_value.v)) + FACET_VALUE_ADD_EMPTY_VALUE_TO_INDEX(k); + + return k->current_value.v; +} + +// ---------------------------------------------------------------------------- +// The FACET_KEY index within each FACET + +#define foreach_key_in_facets(facets, k) \ + for((k) = (facets)->keys.ll; (k) ;(k) = (k)->next) + +#define foreach_key_in_facets_done(k) do { ; } while(0) + +static inline void facet_key_late_init(FACETS *facets, FACET_KEY *k) { + if(k->values.enabled) + return; + + if(facets_key_is_facet(facets, k)) { + FACETS_VALUES_INDEX_CREATE(k); + k->values.enabled = true; + } +} + +static inline void FACETS_KEYS_INDEX_CREATE(FACETS *facets) { + facets->keys.ll = NULL; +} + +static inline void FACETS_KEYS_INDEX_DESTROY(FACETS *facets) { + FACET_KEY *k = facets->keys.ll; + while(k) { + FACET_KEY *next = k->next; + + FACETS_VALUES_INDEX_DESTROY(k); + buffer_free(k->current_value.b); + freez((void *)k->name); + freez(k); + + k = next; + } + memset(facets->keys.hashtable, 0, sizeof(facets->keys.hashtable)); + facets->keys.ll = NULL; +} + +static inline FACET_KEY **facets_keys_hashtable_slot(FACETS *facets, FACETS_HASH hash) { + size_t slot = hash % FACETS_KEYS_HASHTABLE_ENTRIES; + FACET_KEY **k = &facets->keys.hashtable[slot]; + + while(*k && (*k)->hash != hash) + k = &((*k)->parent_hashtable.next); + + return k; +} + +static inline FACET_KEY *FACETS_KEY_GET_FROM_INDEX(FACETS *facets, FACETS_HASH hash) { + FACET_KEY **k = facets_keys_hashtable_slot(facets, hash); + return *k; +} + +static inline FACET_KEY *FACETS_KEY_ADD_TO_INDEX(FACETS *facets, FACETS_HASH hash, const char *name, FACET_KEY_OPTIONS options) { + facets->operations.keys.registered++; + + FACET_KEY **k_ptr = facets_keys_hashtable_slot(facets, hash); + + if(likely(*k_ptr)) { + // already exists + + FACET_KEY *k = *k_ptr; + + if(!k->name && name) { + // an actual value, not a filter + k->name = strdupz(name); + facet_key_late_init(facets, k); + } + + internal_fatal(k->name && name && strcmp(k->name, name) != 0, + "key hash conflict: '%s' and '%s' have the same hash '%s'", + k->name, name, + hash_to_static_string(hash)); + + if(k->options & FACET_KEY_OPTION_REORDER) { + k->order = facets->order++; + k->options &= ~FACET_KEY_OPTION_REORDER; + } + + return k; + } + + // we have to add it + facets->operations.keys.unique++; + + FACET_KEY *k = callocz(1, sizeof(*k)); + *k_ptr = k; // add it to the index + + k->hash = hash; + k->facets = facets; + k->options = options; + k->current_value.b = buffer_create(0, NULL); + k->default_selected_for_values = true; + + if(!(k->options & FACET_KEY_OPTION_REORDER)) + k->order = facets->order++; + + if((k->options & FACET_KEY_OPTION_FTS) || (facets->options & FACETS_OPTION_ALL_KEYS_FTS)) + facets->keys_filtered_by_query++; + + if(name) { + // an actual value, not a filter + k->name = strdupz(name); + facet_key_late_init(facets, k); + } + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(facets->keys.ll, k, prev, next); + + return k; +} + +// ---------------------------------------------------------------------------- + static usec_t calculate_histogram_bar_width(usec_t after_ut, usec_t before_ut) { // Array of valid durations in seconds static time_t valid_durations_s[] = { @@ -204,7 +530,7 @@ static inline usec_t facets_histogram_slot_baseline_ut(FACETS *facets, usec_t ut return ut - delta_ut; } -void facets_set_histogram(FACETS *facets, const char *chart, usec_t after_ut, usec_t before_ut) { +void facets_set_histogram_by_id(FACETS *facets, const char *key_id, usec_t after_ut, usec_t before_ut) { if(after_ut > before_ut) { usec_t t = after_ut; after_ut = before_ut; @@ -212,7 +538,17 @@ void facets_set_histogram(FACETS *facets, const char *chart, usec_t after_ut, us } facets->histogram.enabled = true; - facets->histogram.chart = chart ? strdupz(chart) : NULL; + + if(key_id && *key_id && strlen(key_id) == FACET_STRING_HASH_SIZE - 1) { + facets->histogram.chart = strdupz(key_id); + facets->histogram.hash = str_to_facets_hash(facets->histogram.chart); + } + else { + freez(facets->histogram.chart); + facets->histogram.chart = NULL; + facets->histogram.hash = FACETS_HASH_ZERO; + } + facets->histogram.slot_width_ut = calculate_histogram_bar_width(after_ut, before_ut); facets->histogram.after_ut = facets_histogram_slot_baseline_ut(facets, after_ut); facets->histogram.before_ut = facets_histogram_slot_baseline_ut(facets, before_ut) + facets->histogram.slot_width_ut; @@ -224,6 +560,13 @@ void facets_set_histogram(FACETS *facets, const char *chart, usec_t after_ut, us } } +void facets_set_histogram_by_name(FACETS *facets, const char *key_name, usec_t after_ut, usec_t before_ut) { + char hash_str[FACET_STRING_HASH_SIZE]; + FACETS_HASH hash = FACETS_HASH_FUNCTION(key_name, strlen(key_name)); + facets_hash_to_str(hash, hash_str); + facets_set_histogram_by_id(facets, hash_str, after_ut, before_ut); +} + static inline void facets_histogram_update_value(FACETS *facets, FACET_KEY *k __maybe_unused, FACET_VALUE *v, usec_t usec) { if(!facets->histogram.enabled) return; @@ -253,15 +596,15 @@ static inline void facets_histogram_value_names(BUFFER *wb, FACETS *facets __may if(first_key) buffer_json_add_array_item_string(wb, first_key); - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_string(wb, v->name); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -270,15 +613,15 @@ static inline void facets_histogram_value_names(BUFFER *wb, FACETS *facets __may static inline void facets_histogram_value_units(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_string(wb, "events"); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -287,15 +630,15 @@ static inline void facets_histogram_value_units(BUFFER *wb, FACETS *facets __may static inline void facets_histogram_value_min(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_uint64(wb, v->min); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -304,15 +647,15 @@ static inline void facets_histogram_value_min(BUFFER *wb, FACETS *facets __maybe static inline void facets_histogram_value_max(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_uint64(wb, v->max); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -321,15 +664,15 @@ static inline void facets_histogram_value_max(BUFFER *wb, FACETS *facets __maybe static inline void facets_histogram_value_avg(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_double(wb, (double) v->sum / (double) facets->histogram.slots); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -338,15 +681,15 @@ static inline void facets_histogram_value_avg(BUFFER *wb, FACETS *facets __maybe static inline void facets_histogram_value_arp(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_uint64(wb, 0); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -355,15 +698,15 @@ static inline void facets_histogram_value_arp(BUFFER *wb, FACETS *facets __maybe static inline void facets_histogram_value_con(BUFFER *wb, FACETS *facets __maybe_unused, FACET_KEY *k, const char *key, uint32_t sum) { buffer_json_member_add_array(wb, key); { - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; buffer_json_add_array_item_double(wb, (double) v->sum * 100.0 / (double) sum); } - dfe_done(v); + foreach_value_in_key_done(v); } } buffer_json_array_close(wb); // key @@ -373,9 +716,9 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) size_t dimensions = 0; uint32_t min = UINT32_MAX, max = 0, sum = 0, count = 0; - if(k && k->values) { + if(k && k->values.enabled) { FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; @@ -406,7 +749,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) v->sum += n; } } - dfe_done(v); + foreach_value_in_key_done(v); } buffer_json_member_add_object(wb, "summary"); @@ -526,10 +869,10 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) buffer_json_array_close(wb); // instances buffer_json_member_add_array(wb, "dimensions"); - if(dimensions && k && k->values) { + if(dimensions && k && k->values.enabled) { size_t pri = 0; FACET_VALUE *v; - dfe_start_read(k->values, v) { + foreach_value_in_key(k, v) { if(unlikely(!v->histogram)) continue; @@ -554,7 +897,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) } buffer_json_object_close(wb); // dimension } - dfe_done(v); + foreach_value_in_key_done(v); } buffer_json_array_close(wb); // dimensions @@ -612,7 +955,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) buffer_json_object_close(wb); // point buffer_json_member_add_array(wb, "data"); - if(k && k->values) { + if(k && k->values.enabled) { usec_t t = facets->histogram.after_ut; for(uint32_t i = 0; i < facets->histogram.slots ;i++) { buffer_json_add_array_item_array(wb); // row @@ -620,7 +963,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) buffer_json_add_array_item_time_ms(wb, t / USEC_PER_SEC); FACET_VALUE *v; - dfe_start_read(k->values, v){ + foreach_value_in_key(k, v) { if (unlikely(!v->histogram)) continue; @@ -632,7 +975,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) buffer_json_array_close(wb); // point } - dfe_done(v); + foreach_value_in_key_done(v); } buffer_json_array_close(wb); // row @@ -689,7 +1032,7 @@ static void facets_histogram_generate(FACETS *facets, FACET_KEY *k, BUFFER *wb) buffer_json_member_add_object(wb, "view"); { char title[1024 + 1] = "Events Distribution"; - FACET_KEY *kt = dictionary_get(facets->keys, facets->histogram.chart); + FACET_KEY *kt = FACETS_KEY_GET_FROM_INDEX(facets, facets->histogram.hash); if(kt && kt->name) snprintfz(title, 1024, "Events Distribution by %s", kt->name); @@ -790,138 +1133,12 @@ static inline bool facets_key_is_facet(FACETS *facets, FACET_KEY *k) { return false; } -// ---------------------------------------------------------------------------- -// FACET_VALUE dictionary hooks - -static void facet_value_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) { - FACET_VALUE *v = value; - FACET_KEY *k = data; - - if(!v->selected) - v->selected = k->default_selected_for_values; - - if(v->name) { - // an actual value, not a filter - v->name = strdupz(v->name); - facet_value_is_used(k, v); - } - - k->facets->operations.values.inserts++; -} - -static bool facet_value_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *data) { - FACET_VALUE *v = old_value; - FACET_VALUE *nv = new_value; - FACET_KEY *k = data; - - if(!v->name && nv->name) - // an actual value, not a filter - v->name = strdupz(nv->name); - - if(v->name) - facet_value_is_used(k, v); - - internal_fatal(v->name && nv->name && strcmp(v->name, nv->name) != 0, "value hash conflict: '%s' and '%s' have the same hash '%s'", v->name, nv->name, - dictionary_acquired_item_name(item)); - - k->facets->operations.values.conflicts++; - - return false; -} - -static void facet_value_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { - FACET_VALUE *v = value; - freez(v->histogram); - freez((char *)v->name); -} - -// ---------------------------------------------------------------------------- -// FACET_KEY dictionary hooks - -static inline void facet_key_late_init(FACETS *facets, FACET_KEY *k) { - if(k->values) - return; - - if(facets_key_is_facet(facets, k)) { - k->values = dictionary_create_advanced( - DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - NULL, sizeof(FACET_VALUE)); - dictionary_register_insert_callback(k->values, facet_value_insert_callback, k); - dictionary_register_conflict_callback(k->values, facet_value_conflict_callback, k); - dictionary_register_delete_callback(k->values, facet_value_delete_callback, k); - } -} - -static void facet_key_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data) { - FACET_KEY *k = value; - FACETS *facets = data; - - k->facets = facets; - - if(!(k->options & FACET_KEY_OPTION_REORDER)) - k->order = facets->order++; - - if((k->options & FACET_KEY_OPTION_FTS) || (facets->options & FACETS_OPTION_ALL_KEYS_FTS)) - facets->keys_filtered_by_query++; - - if(k->name) { - // an actual value, not a filter - k->name = strdupz(k->name); - facet_key_late_init(facets, k); - } - - k->current_value.b = buffer_create(0, NULL); - - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(facets->keys_ll, k, prev, next); - - facets->operations.keys.registered++; - facets->operations.keys.unique++; -} - -static bool facet_key_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *data) { - FACET_KEY *k = old_value; - FACET_KEY *nk = new_value; - FACETS *facets = data; - - if(!k->name && nk->name) { - // an actual value, not a filter - k->name = strdupz(nk->name); - facet_key_late_init(facets, k); - } - - internal_fatal(k->name && nk->name && strcmp(k->name, nk->name) != 0, "key hash conflict: '%s' and '%s' have the same hash '%s'", k->name, nk->name, - dictionary_acquired_item_name(item)); - - if(k->options & FACET_KEY_OPTION_REORDER) { - k->order = facets->order++; - k->options &= ~FACET_KEY_OPTION_REORDER; - } - - facets->operations.keys.registered++; - - return false; -} - -static void facet_key_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { - FACET_KEY *k = value; - FACETS *facets = data; - - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(facets->keys_ll, k, prev, next); - - dictionary_destroy(k->values); - buffer_free(k->current_value.b); - freez((char *)k->name); -} - // ---------------------------------------------------------------------------- FACETS *facets_create(uint32_t items_to_return, FACETS_OPTIONS options, const char *visible_keys, const char *facet_keys, const char *non_facet_keys) { FACETS *facets = callocz(1, sizeof(FACETS)); facets->options = options; - facets->keys = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE|DICT_OPTION_FIXED_SIZE, NULL, sizeof(FACET_KEY)); - dictionary_register_insert_callback(facets->keys, facet_key_insert_callback, facets); - dictionary_register_conflict_callback(facets->keys, facet_key_conflict_callback, facets); - dictionary_register_delete_callback(facets->keys, facet_key_delete_callback, facets); + FACETS_KEYS_INDEX_CREATE(facets); if(facet_keys && *facet_keys) facets->included_keys = simple_pattern_create(facet_keys, "|", SIMPLE_PATTERN_EXACT, true); @@ -942,7 +1159,7 @@ FACETS *facets_create(uint32_t items_to_return, FACETS_OPTIONS options, const ch void facets_destroy(FACETS *facets) { dictionary_destroy(facets->accepted_params); - dictionary_destroy(facets->keys); + FACETS_KEYS_INDEX_DESTROY(facets); simple_pattern_free(facets->visible_keys); simple_pattern_free(facets->included_keys); simple_pattern_free(facets->excluded_keys); @@ -966,14 +1183,7 @@ void facets_accepted_param(FACETS *facets, const char *param) { } static inline FACET_KEY *facets_register_key_name_length(FACETS *facets, const char *key, size_t key_length, FACET_KEY_OPTIONS options) { - FACET_KEY tk = { - .name = key, - .options = options, - .default_selected_for_values = true, - }; - char hash[FACET_STRING_HASH_SIZE]; - facets_string_hash(tk.name, key_length, hash); - return dictionary_set(facets->keys, hash, &tk, sizeof(tk)); + return FACETS_KEY_ADD_TO_INDEX(facets, FACETS_HASH_FUNCTION(key, key_length), key, options); } inline FACET_KEY *facets_register_key_name(FACETS *facets, const char *key, FACET_KEY_OPTIONS options) { @@ -1011,12 +1221,15 @@ void facets_set_anchor(FACETS *facets, usec_t anchor, FACETS_ANCHOR_DIRECTION di } inline FACET_KEY *facets_register_facet_id(FACETS *facets, const char *key_id, FACET_KEY_OPTIONS options) { - FACET_KEY tk = { - .options = options, - .default_selected_for_values = true, - }; - FACET_KEY *k = dictionary_set(facets->keys, key_id, &tk, sizeof(tk)); + if(!is_valid_string_hash(key_id)) + return NULL; + FACETS_HASH hash = str_to_facets_hash(key_id); + + internal_error(strcmp(hash_to_static_string(hash), key_id) != 0, + "Regenerating the user supplied key, does not produce the same hash string"); + + FACET_KEY *k = FACETS_KEY_ADD_TO_INDEX(facets, hash, NULL, options); k->options |= FACET_KEY_OPTION_FACET; k->options &= ~FACET_KEY_OPTION_NO_FACET; facet_key_late_init(facets, k); @@ -1024,14 +1237,14 @@ inline FACET_KEY *facets_register_facet_id(FACETS *facets, const char *key_id, F return k; } -void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_ids, FACET_KEY_OPTIONS options) { +void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_id, FACET_KEY_OPTIONS options) { FACET_KEY *k = facets_register_facet_id(facets, key_id, options); - k->default_selected_for_values = false; - - FACET_VALUE tv = { - .selected = true, - }; - dictionary_set(k->values, value_ids, &tv, sizeof(tv)); + if(k) { + if(is_valid_string_hash(value_id)) { + k->default_selected_for_values = false; + FACET_VALUE_ADD_OR_UPDATE_SELECTED(k, str_to_facets_hash(value_id)); + } + } } void facets_set_current_row_severity(FACETS *facets, FACET_ROW_SEVERITY severity) { @@ -1044,26 +1257,34 @@ void facets_data_only_mode(FACETS *facets) { // ---------------------------------------------------------------------------- -static inline void facets_check_value(FACETS *facets __maybe_unused, FACET_KEY *k) { - facets->operations.values.registered++; +static inline void facets_key_set_empty_value(FACETS *facets, FACET_KEY *k) { + k->current_value.updated = true; + k->current_value.empty = true; - if(!k->current_value.updated) - buffer_flush(k->current_value.b); + facets->operations.values.registered++; + facets->operations.values.empty++; + + buffer_contents_replace(k->current_value.b, FACET_VALUE_UNSET, sizeof(FACET_VALUE_UNSET) - 1); + + if(k->values.enabled) + FACET_VALUE_ADD_EMPTY_VALUE_TO_INDEX(k); + else { + k->key_found_in_row++; + k->key_values_selected_in_row++; + } +} + +static inline void facets_key_check_value(FACETS *facets, FACET_KEY *k) { + k->current_value.updated = true; + k->current_value.empty = false; + + facets->operations.values.registered++; if(k->transform.cb) { facets->operations.values.transformed++; k->transform.cb(facets, k->current_value.b, k->transform.data); } - if(!k->current_value.updated) { - buffer_fast_strcat(k->current_value.b, FACET_VALUE_UNSET, sizeof(FACET_VALUE_UNSET) - 1); - k->current_value.updated = true; - k->current_value.empty = true; - facets->operations.values.empty++; - } - else - k->current_value.empty = false; - // bool found = false; // if(strstr(buffer_tostring(k->current_value), "fprintd") != NULL) // found = true; @@ -1074,32 +1295,8 @@ static inline void facets_check_value(FACETS *facets __maybe_unused, FACET_KEY * facets->current_row.keys_matched_by_query++; } - if(k->values) { - if(k->current_value.empty) { - facets_zero_hash(k->current_value.hash); - - FACET_VALUE tk = { - .name = FACET_VALUE_UNSET, - }; - - if(k->empty_value.empty_value) { - facet_value_conflict_callback(NULL, k->empty_value.empty_value, &tk, k); - } - else { - FACET_VALUE *tkv = dictionary_set(k->values, k->current_value.hash, &tk, sizeof(tk)); - tkv->empty = true; - k->empty_value.empty_value = tkv; - } - } - else { - FACET_VALUE tk = { - .name = buffer_tostring(k->current_value.b), - }; - facets_string_hash(tk.name, buffer_strlen(k->current_value.b), k->current_value.hash); - dictionary_set(k->values, k->current_value.hash, &tk, sizeof(tk)); - facets->operations.values.indexed++; - } - } + if(k->values.enabled) + FACET_VALUE_ADD_CURRENT_VALUE_TO_INDEX(k); else { k->key_found_in_row++; k->key_values_selected_in_row++; @@ -1110,18 +1307,15 @@ void facets_add_key_value(FACETS *facets, const char *key, const char *value) { FACET_KEY *k = facets_register_key_name(facets, key, 0); buffer_flush(k->current_value.b); buffer_strcat(k->current_value.b, value); - k->current_value.updated = true; - facets_check_value(facets, k); + facets_key_check_value(facets, k); } void facets_add_key_value_length(FACETS *facets, const char *key, size_t key_len, const char *value, size_t value_len) { FACET_KEY *k = facets_register_key_name_length(facets, key, key_len, 0); - buffer_flush(k->current_value.b); - buffer_strncat(k->current_value.b, value, value_len); - k->current_value.updated = true; + buffer_contents_replace(k->current_value.b, value, value_len); - facets_check_value(facets, k); + facets_key_check_value(facets, k); } // ---------------------------------------------------------------------------- @@ -1181,7 +1375,7 @@ static FACET_ROW *facets_row_create(FACETS *facets, usec_t usec, FACET_ROW *into row->usec = usec; FACET_KEY *k; - dfe_start_read(facets->keys, k) { + foreach_key_in_facets(facets, k) { FACET_ROW_KEY_VALUE t = { .tmp = (k->current_value.updated) ? buffer_tostring(k->current_value.b) : FACET_VALUE_UNSET, .wb = NULL, @@ -1189,7 +1383,7 @@ static FACET_ROW *facets_row_create(FACETS *facets, usec_t usec, FACET_ROW *into }; dictionary_set(row->dict, k->name, &t, sizeof(t)); } - dfe_done(k); + foreach_key_in_facets_done(k); return row; } @@ -1318,15 +1512,15 @@ static void facets_row_keep(FACETS *facets, usec_t usec) { void facets_rows_begin(FACETS *facets) { FACET_KEY *k; - // dfe_start_read(facets->keys, k) { - for(k = facets->keys_ll ; k ; k = k->next) { + foreach_key_in_facets(facets, k) { k->key_found_in_row = 0; k->key_values_selected_in_row = 0; k->current_value.updated = false; k->current_value.empty = false; - k->current_value.hash[0] = '\0'; + k->current_value.hash = FACETS_HASH_ZERO; + k->current_value.v = NULL; } - // dfe_done(k); + foreach_key_in_facets_done(k); facets->current_row.severity = FACET_ROW_SEVERITY_NORMAL; facets->current_row.keys_matched_by_query = 0; @@ -1342,11 +1536,10 @@ void facets_row_finished(FACETS *facets, usec_t usec) { uint32_t selected_by = 0; FACET_KEY *k; - // dfe_start_read(facets->keys, k) { - for(k = facets->keys_ll ; k ; k = k->next) { + foreach_key_in_facets(facets, k) { if(!k->key_found_in_row) { // put the FACET_VALUE_UNSET value into it - facets_check_value(facets, k); + facets_key_set_empty_value(facets, k); } internal_fatal(!k->key_found_in_row, "all keys should be found in the row at this point"); @@ -1355,27 +1548,23 @@ void facets_row_finished(FACETS *facets, usec_t usec) { k->key_found_in_row = 1; - total_keys += k->key_found_in_row; + total_keys++; selected_by += (k->key_values_selected_in_row) ? 1 : 0; } - // dfe_done(k); + foreach_key_in_facets_done(k); if(selected_by >= total_keys - 1) { uint32_t found = 0; - // dfe_start_read(facets->keys, k){ - for(k = facets->keys_ll ; k ; k = k->next) { + foreach_key_in_facets(facets, k) { uint32_t counted_by = selected_by; if (counted_by != total_keys && !k->key_values_selected_in_row) counted_by++; if(counted_by == total_keys) { - if(k->values) { - if(!k->current_value.hash[0]) - facets_string_hash(buffer_tostring(k->current_value.b), buffer_strlen(k->current_value.b), k->current_value.hash); - - FACET_VALUE *v = dictionary_get(k->values, k->current_value.hash); + if(k->values.enabled) { + FACET_VALUE *v = FACET_VALUE_GET_CURRENT_VALUE(k); v->final_facet_value_counter++; if(selected_by == total_keys) @@ -1385,7 +1574,7 @@ void facets_row_finished(FACETS *facets, usec_t usec) { found++; } } - // dfe_done(k); + foreach_key_in_facets_done(k); internal_fatal(!found, "We should find at least one facet to count this row"); (void)found; @@ -1427,20 +1616,20 @@ void facets_accepted_parameters_to_json_array(FACETS *facets, BUFFER *wb, bool w if(facets->accepted_params) { void *t; dfe_start_read(facets->accepted_params, t) { - buffer_json_add_array_item_string(wb, t_dfe.name); - } + buffer_json_add_array_item_string(wb, t_dfe.name); + } dfe_done(t); } if(with_keys) { FACET_KEY *k; - dfe_start_read(facets->keys, k){ - if (!k->values) - continue; + foreach_key_in_facets(facets, k){ + if (!k->values.enabled) + continue; - buffer_json_add_array_item_string(wb, k_dfe.name); - } - dfe_done(k); + buffer_json_add_array_item_string(wb, hash_to_static_string(k->hash)); + } + foreach_key_in_facets_done(k); } } buffer_json_array_close(wb); // accepted_params @@ -1467,13 +1656,13 @@ void facets_report(FACETS *facets, BUFFER *wb) { buffer_json_member_add_array(wb, "facets"); { FACET_KEY *k; - dfe_start_read(facets->keys, k) { - if(!k->values) + foreach_key_in_facets(facets, k) { + if(!k->values.enabled) continue; buffer_json_add_array_item_object(wb); // key { - buffer_json_member_add_string(wb, "id", k_dfe.name); + buffer_json_member_add_string(wb, "id", hash_to_static_string(k->hash)); buffer_json_member_add_string(wb, "name", k->name); if(!k->order) @@ -1483,22 +1672,22 @@ void facets_report(FACETS *facets, BUFFER *wb) { buffer_json_member_add_array(wb, "options"); { FACET_VALUE *v; - dfe_start_read(k->values, v) { + foreach_value_in_key(k, v) { buffer_json_add_array_item_object(wb); { - buffer_json_member_add_string(wb, "id", v_dfe.name); + buffer_json_member_add_string(wb, "id", hash_to_static_string(v->hash)); buffer_json_member_add_string(wb, "name", v->name); buffer_json_member_add_uint64(wb, "count", v->final_facet_value_counter); } buffer_json_object_close(wb); } - dfe_done(v); + foreach_value_in_key_done(v); } buffer_json_array_close(wb); // options } buffer_json_object_close(wb); // key } - dfe_done(k); + foreach_key_in_facets_done(k); } buffer_json_array_close(wb); // facets } @@ -1534,11 +1723,11 @@ void facets_report(FACETS *facets, BUFFER *wb) { NULL); FACET_KEY *k; - dfe_start_read(facets->keys, k){ + foreach_key_in_facets(facets, k) { RRDF_FIELD_OPTIONS options = RRDF_FIELD_OPTS_NONE; bool visible = k->options & (FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_STICKY); - if ((facets->options & FACETS_OPTION_ALL_FACETS_VISIBLE && k->values)) + if ((facets->options & FACETS_OPTION_ALL_FACETS_VISIBLE && k->values.enabled)) visible = true; if (!visible) @@ -1550,9 +1739,11 @@ void facets_report(FACETS *facets, BUFFER *wb) { if (k->options & FACET_KEY_OPTION_MAIN_TEXT) options |= RRDF_FIELD_OPTS_FULL_WIDTH | RRDF_FIELD_OPTS_WRAP; + const char *hash_str = hash_to_static_string(k->hash); + buffer_rrdf_table_add_field( wb, field_id++, - k_dfe.name, k->name ? k->name : k_dfe.name, + hash_str, k->name ? k->name : hash_str, RRDF_FIELD_TYPE_STRING, (k->options & FACET_KEY_OPTION_RICH_TEXT) ? RRDF_FIELD_VISUAL_RICH : RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, @@ -1564,7 +1755,7 @@ void facets_report(FACETS *facets, BUFFER *wb) { options, FACET_VALUE_UNSET); } - dfe_done(k); + foreach_key_in_facets_done(k); } buffer_json_object_close(wb); // columns } @@ -1594,8 +1785,7 @@ void facets_report(FACETS *facets, BUFFER *wb) { buffer_json_object_close(wb); FACET_KEY *k; - dfe_start_read(facets->keys, k) - { + foreach_key_in_facets(facets, k) { FACET_ROW_KEY_VALUE *rkv = dictionary_get(row->dict, k->name); if(unlikely(k->dynamic.cb)) { @@ -1612,7 +1802,7 @@ void facets_report(FACETS *facets, BUFFER *wb) { buffer_json_add_array_item_string(wb, buffer_tostring(rkv->wb)); } } - dfe_done(k); + foreach_key_in_facets_done(k); buffer_json_array_close(wb); // each row } } @@ -1625,38 +1815,35 @@ void facets_report(FACETS *facets, BUFFER *wb) { } if(facets->histogram.enabled && !(facets->options & FACETS_OPTION_DISABLE_HISTOGRAM)) { - const char *first_histogram = NULL; + FACETS_HASH first_histogram_hash = 0; buffer_json_member_add_array(wb, "available_histograms"); { FACET_KEY *k; - dfe_start_read(facets->keys, k) { - if (!k->values) + foreach_key_in_facets(facets, k) { + if (!k->values.enabled) continue; - if(unlikely(!first_histogram)) - first_histogram = k_dfe.name; + if(unlikely(!first_histogram_hash)) + first_histogram_hash = k->hash; buffer_json_add_array_item_object(wb); - buffer_json_member_add_string(wb, "id", k_dfe.name); + buffer_json_member_add_string(wb, "id", hash_to_static_string(k->hash)); buffer_json_member_add_string(wb, "name", k->name); buffer_json_object_close(wb); } - dfe_done(k); + foreach_key_in_facets_done(k); } buffer_json_array_close(wb); { - const char *id = facets->histogram.chart; - FACET_KEY *k = dictionary_get(facets->keys, id); - if(!k || !k->values) { - id = first_histogram; - k = dictionary_get(facets->keys, id); - } + FACET_KEY *k = FACETS_KEY_GET_FROM_INDEX(facets, facets->histogram.hash); + if(!k || !k->values.enabled) + k = FACETS_KEY_GET_FROM_INDEX(facets, first_histogram_hash); buffer_json_member_add_object(wb, "histogram"); { - buffer_json_member_add_string(wb, "id", id); - buffer_json_member_add_string(wb, "name", k->name); + buffer_json_member_add_string(wb, "id", k ? hash_to_static_string(k->hash) : ""); + buffer_json_member_add_string(wb, "name", k ? k->name : ""); buffer_json_member_add_object(wb, "chart"); facets_histogram_generate(facets, k, wb); buffer_json_object_close(wb); diff --git a/libnetdata/facets/facets.h b/libnetdata/facets/facets.h index 05b50fb4a5..2ae0e62d6f 100644 --- a/libnetdata/facets/facets.h +++ b/libnetdata/facets/facets.h @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + #ifndef FACETS_H #define FACETS_H 1 @@ -46,9 +48,6 @@ typedef struct facet_row { typedef struct facets FACETS; typedef struct facet_key FACET_KEY; -#define FACET_STRING_HASH_SIZE 23 -void facets_string_hash(const char *src, size_t len, char *out); - typedef void (*facets_key_transformer_t)(FACETS *facets __maybe_unused, BUFFER *wb, void *data); typedef void (*facet_dynamic_row_t)(FACETS *facets, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data); FACET_KEY *facets_register_dynamic_key_name(FACETS *facets, const char *key, FACET_KEY_OPTIONS options, facet_dynamic_row_t cb, void *data); @@ -75,8 +74,9 @@ void facets_set_query(FACETS *facets, const char *query); void facets_set_items(FACETS *facets, uint32_t items); void facets_set_anchor(FACETS *facets, usec_t anchor, FACETS_ANCHOR_DIRECTION direction); FACET_KEY *facets_register_facet_id(FACETS *facets, const char *key_id, FACET_KEY_OPTIONS options); -void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_ids, FACET_KEY_OPTIONS options); -void facets_set_histogram(FACETS *facets, const char *chart, usec_t after_ut, usec_t before_ut); +void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_id, FACET_KEY_OPTIONS options); +void facets_set_histogram_by_id(FACETS *facets, const char *key_id, usec_t after_ut, usec_t before_ut); +void facets_set_histogram_by_name(FACETS *facets, const char *key_name, usec_t after_ut, usec_t before_ut); void facets_add_key_value(FACETS *facets, const char *key, const char *value); void facets_add_key_value_length(FACETS *facets, const char *key, size_t key_len, const char *value, size_t value_len); diff --git a/libnetdata/functions_evloop/Makefile.am b/libnetdata/functions_evloop/Makefile.am new file mode 100644 index 0000000000..161784b8f6 --- /dev/null +++ b/libnetdata/functions_evloop/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/libnetdata/functions_evloop/README.md b/libnetdata/functions_evloop/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c new file mode 100644 index 0000000000..f3fdf3a757 --- /dev/null +++ b/libnetdata/functions_evloop/functions_evloop.c @@ -0,0 +1,207 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "functions_evloop.h" + +#define MAX_FUNCTION_PARAMETERS 1024 + +struct functions_evloop_worker_job { + bool used; + bool running; + bool cancelled; + char *cmd; + const char *transaction; + time_t timeout; + functions_evloop_worker_execute_t cb; +}; + +struct rrd_functions_expectation { + const char *function; + size_t function_length; + functions_evloop_worker_execute_t cb; + time_t default_timeout; + struct rrd_functions_expectation *prev, *next; +}; + +struct functions_evloop_globals { + const char *tag; + + DICTIONARY *worker_queue; + pthread_mutex_t worker_mutex; + pthread_cond_t worker_cond_var; + size_t workers; + + netdata_mutex_t *stdout_mutex; + bool *plugin_should_exit; + + netdata_thread_t reader_thread; + netdata_thread_t *worker_threads; + + struct rrd_functions_expectation *expectations; +}; + +static void *rrd_functions_worker_globals_worker_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + while (true) { + pthread_mutex_lock(&wg->worker_mutex); + + while (dictionary_entries(wg->worker_queue) == 0) { + pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); + } + + const DICTIONARY_ITEM *acquired = NULL; + struct functions_evloop_worker_job *j; + dfe_start_write(wg->worker_queue, j) { + if(j->running || j->cancelled) + continue; + + acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); + j->running = true; + break; + } + dfe_done(j); + + pthread_mutex_unlock(&wg->worker_mutex); + + if(acquired) { + j = dictionary_acquired_item_value(acquired); + j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled); + dictionary_del(wg->worker_queue, j->transaction); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_garbage_collect(wg->worker_queue); + } + } + return NULL; +} + +static void *rrd_functions_worker_globals_reader_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + char buffer[PLUGINSD_LINE_MAX + 1]; + + char *s = NULL; + while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { + + char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS); + + const char *keyword = get_word(words, num_words, 0); + + if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + + bool found = false; + struct rrd_functions_expectation *we; + for(we = wg->expectations; we ;we = we->next) { + if(strncmp(function, we->function, we->function_length) == 0) { + struct functions_evloop_worker_job t = { + .cmd = strdupz(function), + .transaction = strdupz(transaction), + .running = false, + .cancelled = false, + .timeout = timeout > 0 ? timeout : we->default_timeout, + .used = false, + .cb = we->cb, + }; + struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t)); + if(j->used) { + netdata_log_error("Received duplicate function transaction '%s'", transaction); + freez((void *)t.cmd); + freez((void *)t.transaction); + } + else { + found = true; + j->used = true; + pthread_cond_signal(&wg->worker_cond_var); + } + } + } + + if(!found) { + netdata_mutex_lock(wg->stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, + "No function with this name found."); + netdata_mutex_unlock(wg->stdout_mutex); + } + } + } + else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_del(wg->worker_queue, transaction); + dictionary_garbage_collect(wg->worker_queue); + } + else + netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction); + } + else + netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); + } + + if(!s || feof(stdin) || ferror(stdin)) { + *wg->plugin_should_exit = true; + netdata_log_error("Received error on stdin."); + } + + exit(1); +} + +void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct functions_evloop_worker_job *j = value; + freez((void *)j->cmd); + freez((void *)j->transaction); +} + +struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) { + struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals)); + + wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL); + + pthread_mutex_init(&wg->worker_mutex, NULL); + pthread_cond_init(&wg->worker_cond_var, NULL); + + wg->plugin_should_exit = plugin_should_exit; + wg->stdout_mutex = stdout_mutex; + wg->workers = worker_threads; + wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t )); + wg->tag = tag; + + char tag_buffer[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag); + netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_reader_main, wg); + + for(size_t i = 0; i < wg->workers ; i++) { + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1); + netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_worker_main, wg); + } + + return wg; +} + +void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) { + struct rrd_functions_expectation *we = callocz(1, sizeof(*we)); + we->function = function; + we->function_length = strlen(we->function); + we->cb = cb; + we->default_timeout = default_timeout; + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next); +} diff --git a/libnetdata/functions_evloop/functions_evloop.h b/libnetdata/functions_evloop/functions_evloop.h new file mode 100644 index 0000000000..ee0f72cb53 --- /dev/null +++ b/libnetdata/functions_evloop/functions_evloop.h @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_FUNCTIONS_EVLOOP_H +#define NETDATA_FUNCTIONS_EVLOOP_H + +#include "../libnetdata.h" + +#define PLUGINSD_KEYWORD_CHART "CHART" +#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" +#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" +#define PLUGINSD_KEYWORD_BEGIN "BEGIN" +#define PLUGINSD_KEYWORD_SET "SET" +#define PLUGINSD_KEYWORD_END "END" +#define PLUGINSD_KEYWORD_FLUSH "FLUSH" +#define PLUGINSD_KEYWORD_DISABLE "DISABLE" +#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" +#define PLUGINSD_KEYWORD_LABEL "LABEL" +#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" +#define PLUGINSD_KEYWORD_CLABEL "CLABEL" +#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" +#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" +#define PLUGINSD_KEYWORD_FUNCTION_CANCEL "FUNCTION_CANCEL" +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" + +#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" +#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" +#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" +#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" +#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" +#define PLUGINSD_KEYWORD_REPLAY_END "REND" + +#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" +#define PLUGINSD_KEYWORD_SET_V2 "SET2" +#define PLUGINSD_KEYWORD_END_V2 "END2" + +#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" +#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" +#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" +#define PLUGINSD_KEYWORD_HOST "HOST" + +#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" +#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" + +#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" + +#define PLUGINSD_KEYWORD_EXIT "EXIT" + +#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds + +typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, int timeout, bool *cancelled); +struct functions_evloop_worker_job; +struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit); +void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout); + + +#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ + buffer_sprintf(wb \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) + +#define pluginsd_function_result_end_to_buffer(wb) \ + buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") + +#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ + fprintf(stdout \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) + +#define pluginsd_function_result_end_to_stdout() \ + fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") + +static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) { + char buffer[PLUGINSD_LINE_MAX + 1]; + json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); + + pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); + fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { + pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); + fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +#endif //NETDATA_FUNCTIONS_EVLOOP_H diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h index eb55f0fe70..9c07d97b6d 100644 --- a/libnetdata/inlined.h +++ b/libnetdata/inlined.h @@ -394,10 +394,10 @@ static inline NETDATA_DOUBLE str2ndd_encoded(const char *src, char **endptr) { return str2ndd(src, endptr) * sign; } -static inline char *strncpyz(char *dst, const char *src, size_t n) { +static inline char *strncpyz(char *dst, const char *src, size_t dst_size_minus_1) { char *p = dst; - while (*src && n--) + while (*src && dst_size_minus_1--) *dst++ = *src++; *dst = '\0'; diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index 0ae30c14d4..164817044e 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -842,6 +842,7 @@ extern char *netdata_configured_host_prefix; #include "gorilla/gorilla.h" #include "facets/facets.h" #include "dyn_conf/dyn_conf.h" +#include "functions_evloop/functions_evloop.h" // BEWARE: this exists in alarm-notify.sh #define DEFAULT_CLOUD_BASE_URL "https://app.netdata.cloud" diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 73bd438c9d..09df8e711f 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -6,6 +6,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" #include "web/server/web_client.h" +#include "database/rrdfunctions.h" #include "database/rrd.h" #define CONNECTED_TO_SIZE 100 diff --git a/streaming/sender.c b/streaming/sender.c index d26181020c..591611e71b 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -940,6 +940,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { buffer_strlen(func_wb), now_realtime_usec() - tmp->received_ut); } + string_freez(tmp->transaction); buffer_free(func_wb); freez(tmp); @@ -989,7 +990,9 @@ void execute_commands(struct sender_state *s) { tmp->transaction = string_strdupz(transaction); BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); - int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); + int code = rrd_function_run(s->host, wb, timeout, function, false, transaction, + stream_execute_function_callback, tmp, NULL, NULL); + if(code != HTTP_RESP_OK) { if (!buffer_strlen(wb)) rrd_call_function_error(wb, "Failed to route request to collector", code); @@ -998,6 +1001,13 @@ void execute_commands(struct sender_state *s) { } } } + else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + + char *transaction = get_word(words, num_words, 1); + if(transaction && *transaction) + rrd_function_cancel(transaction); + } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index 0045246f0d..a472d57872 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -1412,7 +1412,9 @@ int web_client_api_request_v1_function(RRDHOST *host, struct web_client *w, char wb->content_type = CT_APPLICATION_JSON; buffer_no_cacheable(wb); - return rrd_call_function_and_wait(host, wb, timeout, function); + return rrd_function_run(host, wb, timeout, function, true, NULL, + NULL, NULL, + web_client_interrupt_callback, w); } int web_client_api_request_v1_functions(RRDHOST *host, struct web_client *w, char *url __maybe_unused) { diff --git a/web/server/static/static-threaded.c b/web/server/static/static-threaded.c index ad6455ccdf..60d80446eb 100644 --- a/web/server/static/static-threaded.c +++ b/web/server/static/static-threaded.c @@ -370,7 +370,7 @@ static int web_server_snd_callback(POLLINFO *pi, short int *events) { netdata_log_debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd); - int ret = web_client_send(w); + ssize_t ret = web_client_send(w); if(unlikely(ret < 0)) { retval = -1;