0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-14 01:29:11 +00:00

functions cancelling ()

This commit is contained in:
Costa Tsaousis 2023-09-18 19:21:12 +03:00 committed by GitHub
parent 717ba3e9b2
commit ed3ba44514
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1645 additions and 849 deletions

View file

@ -156,6 +156,8 @@ LIBNETDATA_FILES = \
libnetdata/eval/eval.h \
libnetdata/facets/facets.c \
libnetdata/facets/facets.h \
libnetdata/functions_evloop/functions_evloop.c \
libnetdata/functions_evloop/functions_evloop.h \
libnetdata/gorilla/gorilla.h \
libnetdata/gorilla/gorilla.cc \
libnetdata/inlined.h \

View file

@ -4677,7 +4677,7 @@ static void apps_plugin_function_processes_help(const char *transaction) {
buffer_json_add_array_item_double(wb, _tmp); \
} while(0)
static void function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
static void function_processes(const char *transaction, char *function __maybe_unused, int timeout __maybe_unused, bool *cancelled __maybe_unused) {
struct pid_stat *p;
char *words[PLUGINSD_MAX_WORDS] = { NULL };
@ -5531,62 +5531,6 @@ static void function_processes(const char *transaction, char *function __maybe_u
static bool apps_plugin_exit = false;
static void *reader_main(void *arg __maybe_unused) {
char buffer[PLUGINSD_LINE_MAX + 1];
char *s = NULL;
while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
char *words[PLUGINSD_MAX_WORDS] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
}
else {
int timeout = str2i(timeout_s);
if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
// internal_error(true, "Received function '%s', transaction '%s', timeout %d", function, transaction, timeout);
netdata_mutex_lock(&apps_and_stdout_mutex);
if(strncmp(function, "processes", strlen("processes")) == 0)
function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
else {
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found in apps.plugin.");
}
netdata_mutex_unlock(&apps_and_stdout_mutex);
// internal_error(true, "Done with function '%s', transaction '%s', timeout %d", function, transaction, timeout);
}
}
else
netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
}
if(!s || feof(stdin) || ferror(stdin)) {
apps_plugin_exit = true;
netdata_log_error("Received error on stdin.");
}
exit(1);
return NULL;
}
int main(int argc, char **argv) {
// debug_flags = D_PROCFILE;
stderror = stderr;
@ -5690,10 +5634,17 @@ int main(int argc, char **argv) {
all_pids = callocz(sizeof(struct pid_stat *), (size_t) pid_max + 1);
netdata_thread_t reader_thread;
netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
netdata_mutex_lock(&apps_and_stdout_mutex);
// ------------------------------------------------------------------------
// the event loop for functions
struct functions_evloop_globals *wg =
functions_evloop_init(1, "APPS", &apps_and_stdout_mutex, &apps_plugin_exit);
functions_evloop_add_function(wg, "processes", function_processes, PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT);
// ------------------------------------------------------------------------
netdata_mutex_lock(&apps_and_stdout_mutex);
APPS_PLUGIN_GLOBAL_FUNCTIONS();
usec_t step = update_every * USEC_PER_SEC;
@ -5717,12 +5668,10 @@ int main(int argc, char **argv) {
struct pollfd pollfd = { .fd = fileno(stdout), .events = POLLERR };
if (unlikely(poll(&pollfd, 1, 0) < 0)) {
netdata_mutex_unlock(&apps_and_stdout_mutex);
netdata_thread_cancel(reader_thread);
fatal("Cannot check if a pipe is available");
}
if (unlikely(pollfd.revents & POLLERR)) {
netdata_mutex_unlock(&apps_and_stdout_mutex);
netdata_thread_cancel(reader_thread);
fatal("Received error on read pipe.");
}
@ -5733,7 +5682,6 @@ int main(int argc, char **argv) {
netdata_log_error("Cannot collect /proc data for running processes. Disabling apps.plugin...");
printf("DISABLE\n");
netdata_mutex_unlock(&apps_and_stdout_mutex);
netdata_thread_cancel(reader_thread);
exit(1);
}

View file

@ -10,48 +10,6 @@
#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
#define PLUGINSD_KEYWORD_CHART "CHART"
#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
#define PLUGINSD_KEYWORD_SET "SET"
#define PLUGINSD_KEYWORD_END "END"
#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
#define PLUGINSD_KEYWORD_LABEL "LABEL"
#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION"
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN"
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END"
#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
#define PLUGINSD_KEYWORD_REPLAY_END "REND"
#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
#define PLUGINSD_KEYWORD_SET_V2 "SET2"
#define PLUGINSD_KEYWORD_END_V2 "END2"
#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
#define PLUGINSD_KEYWORD_HOST "HOST"
#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
#define PLUGINSD_KEYWORD_EXIT "EXIT"
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
#define PLUGINSD_LINE_MAX_SSL_READ 512
#define PLUGINSD_MAX_WORDS 20
@ -99,45 +57,4 @@ void pluginsd_process_thread_cleanup(void *ptr);
size_t pluginsd_initialize_plugin_directories();
#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
buffer_sprintf(wb \
, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
, (transaction) ? (transaction) : "" \
, (int)(code) \
, (content_type) ? (content_type) : "" \
, (long int)(expires) \
)
#define pluginsd_function_result_end_to_buffer(wb) \
buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
fprintf(stdout \
, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
, (transaction) ? (transaction) : "" \
, (int)(code) \
, (content_type) ? (content_type) : "" \
, (long int)(expires) \
)
#define pluginsd_function_result_end_to_stdout() \
fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
char buffer[PLUGINSD_LINE_MAX + 1];
json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}
static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}
#endif /* NETDATA_PLUGINS_D_H */

View file

@ -733,14 +733,15 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
struct inflight_function {
int code;
int timeout;
BUFFER *destination_wb;
STRING *function;
void (*callback)(BUFFER *wb, int code, void *callback_data);
void *callback_data;
BUFFER *result_body_wb;
rrd_function_result_callback_t result_cb;
void *result_cb_data;
usec_t timeout_ut;
usec_t started_ut;
usec_t sent_ut;
const char *payload;
PARSER *parser;
};
static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
@ -751,10 +752,12 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
// leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
const char *transaction = dictionary_acquired_item_name(item);
char buffer[2048 + 1];
snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
dictionary_acquired_item_name(item),
transaction,
pf->timeout,
string2str(pf->function));
@ -765,7 +768,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
if(ret < 0) {
netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
}
else {
internal_error(LOG_FUNCTIONS,
@ -782,7 +785,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
if(ret < 0) {
netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
}
else {
internal_error(LOG_FUNCTIONS,
@ -798,8 +801,8 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m
struct inflight_function *pf = new_func;
netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
pf->callback(pf->destination_wb, pf->code, pf->callback_data);
pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
string_freez(pf->function);
return false;
@ -811,9 +814,9 @@ static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __may
internal_error(LOG_FUNCTIONS,
"FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
string2str(pf->function), dictionary_acquired_item_name(item),
buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
pf->callback(pf->destination_wb, pf->code, pf->callback_data);
pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
string_freez(pf->function);
}
@ -833,8 +836,8 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
"FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
string2str(pf->function), pf_dfe.name, now - pf->started_ut);
if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
pf->code = rrd_call_function_error(pf->destination_wb,
if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
pf->code = rrd_call_function_error(pf->result_body_wb,
"Timeout waiting for collector response.",
HTTP_RESP_GATEWAY_TIMEOUT);
@ -847,35 +850,73 @@ static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
dfe_done(pf);
}
void pluginsd_function_cancel(void *data) {
struct inflight_function *look_for = data, *t;
bool sent = false;
dfe_start_read(look_for->parser->inflight.functions, t) {
if(look_for == t) {
const char *transaction = t_dfe.name;
internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
char buffer[2048 + 1];
snprintfz(buffer, 2048, "%s %s\n",
PLUGINSD_KEYWORD_FUNCTION_CANCEL,
transaction);
// send the command to the plugin
ssize_t ret = send_to_plugin(buffer, t->parser);
if(ret < 0)
sent = true;
break;
}
}
dfe_done(t);
if(sent <= 0)
netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
}
// this is the function that is called from
// rrd_call_function_and_wait() and rrd_call_function_async()
static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
PARSER *parser = collector_data;
static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function,
void *execute_cb_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused,
void *is_cancelled_cb_data __maybe_unused,
rrd_function_register_canceller_cb_t register_canceller_cb,
void *register_canceller_db_data) {
PARSER *parser = execute_cb_data;
usec_t now = now_realtime_usec();
struct inflight_function tmp = {
.started_ut = now,
.timeout_ut = now + timeout * USEC_PER_SEC,
.destination_wb = destination_wb,
.result_body_wb = result_body_wb,
.timeout = timeout,
.function = string_strdupz(function),
.callback = callback,
.callback_data = callback_data,
.payload = NULL
.result_cb = result_cb,
.result_cb_data = result_cb_data,
.payload = NULL,
.parser = parser,
};
uuid_t uuid;
uuid_generate_time(uuid);
uuid_generate_random(uuid);
char key[UUID_STR_LEN];
uuid_unparse_lower(uuid, key);
char transaction[UUID_STR_LEN];
uuid_unparse_lower(uuid, transaction);
dictionary_write_lock(parser->inflight.functions);
// if there is any error, our dictionary callbacks will call the caller callback to notify
// the caller about the error - no need for error handling here.
dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function));
if(register_canceller_cb)
register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t);
if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
parser->inflight.smaller_timeout = tmp.timeout_ut;
@ -890,6 +931,8 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
}
static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
// a plugin or a child is registering a function
bool global = false;
size_t i = 1;
if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
@ -926,7 +969,7 @@ static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER
timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
}
rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser);
parser->user.data_collections_count++;
@ -973,18 +1016,18 @@ static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_
}
else {
if(format && *format)
pf->destination_wb->content_type = functions_format_to_content_type(format);
pf->result_body_wb->content_type = functions_format_to_content_type(format);
pf->code = code;
pf->destination_wb->expires = expiration;
pf->result_body_wb->expires = expiration;
if(expiration <= now_realtime_sec())
buffer_no_cacheable(pf->destination_wb);
buffer_no_cacheable(pf->result_body_wb);
else
buffer_cacheable(pf->destination_wb);
buffer_cacheable(pf->result_body_wb);
}
parser->defer.response = (pf) ? pf->destination_wb : NULL;
parser->defer.response = (pf) ? pf->result_body_wb : NULL;
parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
parser->defer.action = pluginsd_function_result_end;
parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
@ -1916,11 +1959,11 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
struct inflight_function tmp = {
.started_ut = now,
.timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
.destination_wb = wb,
.result_body_wb = wb,
.timeout = VIRT_FNC_TIMEOUT,
.function = string_strdupz(name),
.callback = virt_fnc_got_data_cb,
.callback_data = &cond,
.result_cb = virt_fnc_got_data_cb,
.result_cb_data = &cond,
.payload = payload,
};

View file

@ -5,8 +5,6 @@
* GPL v3+
*/
// TODO - 1) MARKDOC 2) HELP TEXT
#include "collectors/all.h"
#include "libnetdata/libnetdata.h"
#include "libnetdata/required_dummies.h"
@ -23,6 +21,7 @@
#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50
#define SYSTEMD_JOURNAL_WORKER_THREADS 2
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@ -58,9 +57,6 @@
static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
static bool plugin_should_exit = false;
DICTIONARY *uids = NULL;
DICTIONARY *gids = NULL;
// ----------------------------------------------------------------------------
static inline sd_journal *netdata_open_systemd_journal(void) {
@ -95,6 +91,7 @@ typedef enum {
ND_SD_JOURNAL_TIMED_OUT,
ND_SD_JOURNAL_OK,
ND_SD_JOURNAL_NOT_MODIFIED,
ND_SD_JOURNAL_CANCELLED,
} ND_SD_JOURNAL_STATUS;
static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
@ -130,21 +127,42 @@ static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *fa
}
}
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) {
static inline ND_SD_JOURNAL_STATUS check_stop(size_t row_counter, const bool *cancelled, usec_t stop_monotonic_ut) {
if((row_counter % 1000) == 0) {
if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) {
internal_error(true, "Function has been cancelled");
return ND_SD_JOURNAL_CANCELLED;
}
if(now_monotonic_usec() > stop_monotonic_ut) {
internal_error(true, "Function timed out");
return ND_SD_JOURNAL_TIMED_OUT;
}
}
return ND_SD_JOURNAL_OK;
}
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(
sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
usec_t after_ut, usec_t before_ut,
usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified,
bool *cancelled) {
if(!netdata_systemd_journal_seek_to(j, before_ut))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
usec_t first_msg_ut = 0;
bool timed_out = false;
size_t row_counter = 0;
// the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
facets_rows_begin(facets);
while (sd_journal_previous(j) > 0) {
while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
row_counter++;
usec_t msg_ut;
@ -174,10 +192,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w
netdata_systemd_journal_process_row(j, facets);
facets_row_finished(facets, msg_ut);
if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
timed_out = true;
break;
}
status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
@ -185,18 +200,19 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w
*last_modified = first_msg_ut;
if(timed_out)
return ND_SD_JOURNAL_TIMED_OUT;
return ND_SD_JOURNAL_OK;
return status;
}
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(
sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
usec_t after_ut, usec_t before_ut,
usec_t anchor, size_t entries, usec_t stop_monotonic_ut,
bool *cancelled) {
if(!netdata_systemd_journal_seek_to(j, anchor))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
bool timed_out = false;
size_t row_counter = 0;
size_t rows_added = 0;
@ -204,8 +220,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
facets_rows_begin(facets);
while (sd_journal_next(j) > 0) {
while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) {
row_counter++;
usec_t msg_ut;
@ -231,27 +249,25 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B
facets_row_finished(facets, msg_ut);
rows_added++;
if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
timed_out = true;
break;
}
status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
if(timed_out)
return ND_SD_JOURNAL_TIMED_OUT;
return ND_SD_JOURNAL_OK;
return status;
}
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(
sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
usec_t after_ut, usec_t before_ut,
usec_t anchor, size_t entries, usec_t stop_monotonic_ut,
bool *cancelled) {
if(!netdata_systemd_journal_seek_to(j, anchor))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
bool timed_out = false;
size_t row_counter = 0;
size_t rows_added = 0;
@ -259,8 +275,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j,
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
facets_rows_begin(facets);
while (sd_journal_previous(j) > 0) {
while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
row_counter++;
usec_t msg_ut;
@ -286,19 +304,13 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j,
facets_row_finished(facets, msg_ut);
rows_added++;
if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
timed_out = true;
break;
}
status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
if(timed_out)
return ND_SD_JOURNAL_TIMED_OUT;
return ND_SD_JOURNAL_OK;
return status;
}
bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
@ -327,7 +339,8 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
usec_t after_ut, usec_t before_ut,
usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries,
usec_t if_modified_since, bool data_only,
usec_t stop_monotonic_ut) {
usec_t stop_monotonic_ut,
bool *cancelled) {
sd_journal *j = netdata_open_systemd_journal();
if(!j)
return HTTP_RESP_INTERNAL_SERVER_ERROR;
@ -341,23 +354,36 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
// we can do a data-only query
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled);
else
status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled);
}
else {
// we have to do a full query
status = netdata_systemd_journal_query_full(j, wb, facets,
after_ut, before_ut, if_modified_since,
stop_monotonic_ut, &last_modified);
stop_monotonic_ut, &last_modified, cancelled);
}
sd_journal_close(j);
if(status == ND_SD_JOURNAL_NOT_MODIFIED)
return HTTP_RESP_NOT_MODIFIED;
if(status != ND_SD_JOURNAL_OK && status != ND_SD_JOURNAL_TIMED_OUT) {
buffer_flush(wb);
buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK);
switch (status) {
case ND_SD_JOURNAL_CANCELLED:
return HTTP_RESP_CLIENT_CLOSED_REQUEST;
case ND_SD_JOURNAL_NOT_MODIFIED:
return HTTP_RESP_NOT_MODIFIED;
default:
case ND_SD_JOURNAL_FAILED_TO_SEEK:
return HTTP_RESP_INTERNAL_SERVER_ERROR;
}
}
buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK);
buffer_json_member_add_string(wb, "type", "table");
@ -372,7 +398,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (data_only ? 3600 : 0));
buffer_json_finalize(wb);
return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK;
return HTTP_RESP_OK;
}
static void netdata_systemd_journal_function_help(const char *transaction) {
@ -484,8 +510,8 @@ static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) {
}
static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
static __thread char tmp[1024 + 1];
struct passwd pw, *result;
char tmp[1024 + 1];
if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL)
return NULL;
@ -495,8 +521,8 @@ static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
}
static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
static __thread char tmp[1024 + 1];
struct group grp, *result;
char tmp[1024 + 1];
if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL)
return NULL;
@ -531,46 +557,106 @@ static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_un
}
}
static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
// ----------------------------------------------------------------------------
// UID and GID transformation
#define UID_GID_HASHTABLE_SIZE 1000
struct word_t2str_hashtable_entry {
struct word_t2str_hashtable_entry *next;
Word_t hash;
size_t len;
char str[];
};
struct word_t2str_hashtable {
SPINLOCK spinlock;
size_t size;
struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE];
};
struct word_t2str_hashtable uid_hashtable = {
.size = UID_GID_HASHTABLE_SIZE,
};
struct word_t2str_hashtable gid_hashtable = {
.size = UID_GID_HASHTABLE_SIZE,
};
struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) {
size_t slot = hash % ht->size;
struct word_t2str_hashtable_entry **e = &ht->hashtable[slot];
while(*e && (*e)->hash != hash)
e = &((*e)->next);
return e;
}
const char *uid_to_username_cached(uid_t uid, size_t *length) {
spinlock_lock(&uid_hashtable.spinlock);
struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid);
if(!(*e)) {
static __thread char buf[1024 + 1];
const char *name = uid_to_username(uid, buf, 1024);
size_t size = strlen(name) + 1;
*e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
(*e)->len = size - 1;
(*e)->hash = uid;
memcpy((*e)->str, name, size);
}
spinlock_unlock(&uid_hashtable.spinlock);
*length = (*e)->len;
return (*e)->str;
}
const char *gid_to_groupname_cached(gid_t gid, size_t *length) {
spinlock_lock(&gid_hashtable.spinlock);
struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid);
if(!(*e)) {
static __thread char buf[1024 + 1];
const char *name = gid_to_groupname(gid, buf, 1024);
size_t size = strlen(name) + 1;
*e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
(*e)->len = size - 1;
(*e)->hash = gid;
memcpy((*e)->str, name, size);
}
spinlock_unlock(&gid_hashtable.spinlock);
*length = (*e)->len;
return (*e)->str;
}
static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
const char *sv = dictionary_get(cache, v);
if(!sv) {
char buf[1024 + 1];
int uid = str2i(buffer_tostring(wb));
const char *name = uid_to_username(uid, buf, 1024);
if (!name)
name = v;
sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
}
buffer_flush(wb);
buffer_strcat(wb, sv);
uid_t uid = str2i(buffer_tostring(wb));
size_t len;
const char *name = uid_to_username_cached(uid, &len);
buffer_contents_replace(wb, name, len);
}
}
static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
const char *sv = dictionary_get(cache, v);
if(!sv) {
char buf[1024 + 1];
int gid = str2i(buffer_tostring(wb));
const char *name = gid_to_groupname(gid, buf, 1024);
if (!name)
name = v;
sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
}
buffer_flush(wb);
buffer_strcat(wb, sv);
gid_t gid = str2i(buffer_tostring(wb));
size_t len;
const char *name = gid_to_groupname_cached(gid, &len);
buffer_contents_replace(wb, name, len);
}
}
// ----------------------------------------------------------------------------
static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
@ -593,47 +679,13 @@ static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused
buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
}
static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) {
buffer_json_add_array_item_object(json_array);
buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
buffer_json_object_close(json_array);
}
static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
static struct {
BUFFER *tmp;
BUFFER *wb;
BUFFER *function;
int response;
time_t expires;
} cache = {
.tmp = NULL,
.wb = NULL,
.function = NULL,
.response = 0,
.expires = 0,
};
if(unlikely(!cache.wb)) {
cache.tmp = buffer_create(0, NULL);
cache.wb = buffer_create(0, NULL);
cache.function = buffer_create(0, NULL);
}
if(buffer_strlen(cache.function) && buffer_strlen(cache.wb) && strcmp(buffer_tostring(cache.function), function) == 0) {
// repeated the same request
netdata_mutex_lock(&stdout_mutex);
if(cache.response == HTTP_RESP_OK)
pluginsd_function_result_to_stdout(transaction, cache.response, "application/json", cache.expires, cache.wb);
else
pluginsd_function_json_error_to_stdout(transaction, cache.response, "failed");
netdata_mutex_unlock(&stdout_mutex);
return;
}
buffer_flush(cache.tmp);
buffer_strcat(cache.tmp, function);
static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS);
@ -682,10 +734,10 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
netdata_systemd_journal_transform_uid, uids);
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
netdata_systemd_journal_transform_gid, gids);
netdata_systemd_journal_transform_gid, NULL);
bool info = false;
bool data_only = false;
@ -858,12 +910,19 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_set_items(facets, last);
facets_set_anchor(facets, anchor, direction);
facets_set_query(facets, query);
facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
if(chart && *chart)
facets_set_histogram_by_id(facets, chart,
after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
else
facets_set_histogram_by_name(facets, "PRIORITY",
after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
anchor, direction, last,
if_modified_since, data_only,
now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC,
cancelled);
if(response != HTTP_RESP_OK) {
netdata_mutex_lock(&stdout_mutex);
@ -872,14 +931,6 @@ static void function_systemd_journal(const char *transaction, char *function, ch
goto cleanup;
}
// keep this response in the cache
cache.response = response;
cache.expires = expires;
buffer_flush(cache.wb);
buffer_fast_strcat(cache.wb, buffer_tostring(wb), buffer_strlen(wb));
buffer_flush(cache.function);
buffer_fast_strcat(cache.function, buffer_tostring(cache.tmp), buffer_strlen(cache.tmp));
output:
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
@ -890,54 +941,7 @@ cleanup:
buffer_free(wb);
}
static void *reader_main(void *arg __maybe_unused) {
char buffer[PLUGINSD_LINE_MAX + 1];
char *s = NULL;
while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
char *words[PLUGINSD_MAX_WORDS] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
}
else {
int timeout = str2i(timeout_s);
if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
else {
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found in systemd-journal.plugin.");
netdata_mutex_unlock(&stdout_mutex);
}
}
}
else
netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
}
if(!s || feof(stdin) || ferror(stdin)) {
plugin_should_exit = true;
netdata_log_error("Received error on stdin.");
}
exit(1);
}
// ----------------------------------------------------------------------------
int main(int argc __maybe_unused, char **argv __maybe_unused) {
stderror = stderr;
@ -952,9 +956,6 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
error_log_errors_per_period = 100;
error_log_throttle_period = 3600;
uids = dictionary_create(0);
gids = dictionary_create(0);
netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
if(verify_netdata_host_prefix() == -1) exit(1);
@ -962,22 +963,28 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
// debug
if(argc == 2 && strcmp(argv[1], "debug") == 0) {
char buf[] = "systemd-journal after:-864000 before:0 last:500";
bool cancelled = false;
char buf[] = "systemd-journal after:-2592000 before:0 last:500";
// char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403";
function_systemd_journal("123", buf, "", 0, 30);
function_systemd_journal("123", buf, 30, &cancelled);
exit(1);
}
// ------------------------------------------------------------------------
// the event loop for functions
struct functions_evloop_globals *wg =
functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit);
functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal,
SYSTEMD_JOURNAL_DEFAULT_TIMEOUT);
netdata_thread_t reader_thread;
netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
// ------------------------------------------------------------------------
time_t started_t = now_monotonic_sec();
size_t iteration;
size_t iteration = 0;
usec_t step = 1000 * USEC_PER_MS;
bool tty = isatty(fileno(stderr)) == 1;
@ -987,7 +994,9 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
heartbeat_t hb;
heartbeat_init(&hb);
for(iteration = 0; 1 ; iteration++) {
while(!plugin_should_exit) {
iteration++;
netdata_mutex_unlock(&stdout_mutex);
heartbeat_next(&hb, step);
netdata_mutex_lock(&stdout_mutex);
@ -1002,8 +1011,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
break;
}
dictionary_destroy(uids);
dictionary_destroy(gids);
exit(0);
}

View file

@ -2023,6 +2023,7 @@ AC_CONFIG_FILES([
libnetdata/ebpf/Makefile
libnetdata/eval/Makefile
libnetdata/facets/Makefile
libnetdata/functions_evloop/Makefile
libnetdata/july/Makefile
libnetdata/locks/Makefile
libnetdata/log/Makefile

View file

@ -1904,6 +1904,8 @@ int main(int argc, char **argv) {
replication_initialize();
rrd_functions_inflight_init();
// --------------------------------------------------------------------
// get the certificate and start security

View file

@ -1,3 +1,4 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "rrd.h"
@ -277,16 +278,15 @@ typedef enum __attribute__((packed)) {
// this is 8-bit
} RRD_FUNCTION_OPTIONS;
struct rrd_collector_function {
struct rrd_host_function {
bool sync; // when true, the function is called synchronously
RRD_FUNCTION_OPTIONS options; // RRD_FUNCTION_OPTIONS
STRING *help;
int timeout; // the default timeout of the function
int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data,
function_data_ready_callback callback, void *callback_data);
rrd_function_execute_cb_t execute_cb;
void *collector_data;
void *execute_cb_data;
struct rrd_collector *collector;
};
@ -299,6 +299,7 @@ struct rrd_collector_function {
struct rrd_collector {
int32_t refcount;
int32_t refcount_canceller;
pid_t tid;
bool running;
};
@ -330,7 +331,7 @@ void rrd_collector_started(void) {
thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
thread_rrd_collector->tid = gettid();
thread_rrd_collector->running = true;
__atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED);
}
// called once per collector
@ -338,17 +339,26 @@ void rrd_collector_finished(void) {
if(!thread_rrd_collector)
return;
thread_rrd_collector->running = false;
__atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED);
int32_t expected = 0;
while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_canceller, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
expected = 0;
sleep_usec(1 * USEC_PER_MS);
}
rrd_collector_free(thread_rrd_collector);
thread_rrd_collector = NULL;
}
#define rrd_collector_running(c) __atomic_load_n(&(c)->running, __ATOMIC_RELAXED)
static struct rrd_collector *rrd_collector_acquire(void) {
rrd_collector_started();
int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0;
do {
if(expected < 0 || !thread_rrd_collector->running) {
if(expected < 0 || !rrd_collector_running(thread_rrd_collector)) {
internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting.");
return thread_rrd_collector;
}
@ -385,7 +395,7 @@ static void rrd_collector_release(struct rrd_collector *rdc) {
static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) {
RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;
struct rrd_host_function *rdcf = func;
rrd_collector_started();
rdcf->collector = rrd_collector_acquire();
@ -397,33 +407,42 @@ static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_un
static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *rrdhost __maybe_unused) {
struct rrd_collector_function *rdcf = func;
struct rrd_host_function *rdcf = func;
rrd_collector_release(rdcf->collector);
}
static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *new_func, void *rrdhost) {
RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;
struct rrd_collector_function *new_rdcf = new_func;
struct rrd_host_function *rdcf = func;
struct rrd_host_function *new_rdcf = new_func;
rrd_collector_started();
bool changed = false;
if(rdcf->collector != thread_rrd_collector) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed collector from %d to %d",
dictionary_acquired_item_name(item), rrdhost_hostname(host), rdcf->collector->tid, thread_rrd_collector->tid);
struct rrd_collector *old_rdc = rdcf->collector;
rdcf->collector = rrd_collector_acquire();
rrd_collector_release(old_rdc);
changed = true;
}
if(rdcf->function != new_rdcf->function) {
rdcf->function = new_rdcf->function;
if(rdcf->execute_cb != new_rdcf->execute_cb) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback",
dictionary_acquired_item_name(item), rrdhost_hostname(host));
rdcf->execute_cb = new_rdcf->execute_cb;
changed = true;
}
if(rdcf->help != new_rdcf->help) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed help text",
dictionary_acquired_item_name(item), rrdhost_hostname(host));
STRING *old = rdcf->help;
rdcf->help = new_rdcf->help;
string_freez(old);
@ -433,17 +452,26 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
string_freez(new_rdcf->help);
if(rdcf->timeout != new_rdcf->timeout) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed timeout",
dictionary_acquired_item_name(item), rrdhost_hostname(host));
rdcf->timeout = new_rdcf->timeout;
changed = true;
}
if(rdcf->sync != new_rdcf->sync) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed sync/async mode",
dictionary_acquired_item_name(item), rrdhost_hostname(host));
rdcf->sync = new_rdcf->sync;
changed = true;
}
if(rdcf->collector_data != new_rdcf->collector_data) {
rdcf->collector_data = new_rdcf->collector_data;
if(rdcf->execute_cb_data != new_rdcf->execute_cb_data) {
netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback data",
dictionary_acquired_item_name(item), rrdhost_hostname(host));
rdcf->execute_cb_data = new_rdcf->execute_cb_data;
changed = true;
}
@ -454,24 +482,23 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
return changed;
}
void rrdfunctions_init(RRDHOST *host) {
void rrdfunctions_host_init(RRDHOST *host) {
if(host->functions) return;
host->functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
&dictionary_stats_category_functions, sizeof(struct rrd_collector_function));
&dictionary_stats_category_functions, sizeof(struct rrd_host_function));
dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host);
dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host);
dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host);
}
void rrdfunctions_destroy(RRDHOST *host) {
void rrdfunctions_host_destroy(RRDHOST *host) {
dictionary_destroy(host->functions);
}
void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
bool sync, function_execute_at_collector function, void *collector_data) {
void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data) {
// RRDSET *st may be NULL in this function
// to create a GLOBAL function
@ -482,12 +509,12 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int
char key[PLUGINSD_LINE_MAX + 1];
sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
struct rrd_collector_function tmp = {
struct rrd_host_function tmp = {
.sync = sync,
.timeout = timeout,
.options = (st)?RRD_FUNCTION_LOCAL:RRD_FUNCTION_GLOBAL,
.function = function,
.collector_data = collector_data,
.execute_cb = execute_cb,
.execute_cb_data = execute_cb_data,
.help = string_strdupz(help),
};
const DICTIONARY_ITEM *item = dictionary_set_and_acquire_item(host->functions, key, &tmp, sizeof(tmp));
@ -504,7 +531,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
if(!st->functions_view)
return;
struct rrd_collector_function *tmp;
struct rrd_host_function *tmp;
dfe_start_read(st->functions_view, tmp) {
buffer_sprintf(wb
, PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n"
@ -519,7 +546,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
struct rrd_collector_function *tmp;
struct rrd_host_function *tmp;
dfe_start_read(host->functions, tmp) {
if(!(tmp->options & RRD_FUNCTION_GLOBAL))
continue;
@ -534,20 +561,6 @@ void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
dfe_done(tmp);
}
struct rrd_function_call_wait {
bool free_with_signal;
bool data_are_ready;
netdata_mutex_t mutex;
pthread_cond_t cond;
int code;
};
static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) {
pthread_cond_destroy(&tmp->cond);
netdata_mutex_destroy(&tmp->mutex);
freez(tmp);
}
struct {
const char *format;
HTTP_CONTENT_TYPE content_type;
@ -596,17 +609,28 @@ int rrd_call_function_error(BUFFER *wb, const char *msg, int code) {
return code;
}
static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, struct rrd_collector_function **rdcf) {
static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, const DICTIONARY_ITEM **item) {
char buffer[MAX_FUNCTION_LENGTH + 1];
strncpyz(buffer, name, MAX_FUNCTION_LENGTH);
char *s = NULL;
*rdcf = NULL;
bool found = false;
*item = NULL;
if(host->functions) {
while (!(*rdcf) && buffer[0]) {
*rdcf = dictionary_get(host->functions, buffer);
if (*rdcf) break;
while (buffer[0]) {
if((*item = dictionary_get_and_acquire_item(host->functions, buffer))) {
found = true;
struct rrd_host_function *rdcf = dictionary_acquired_item_value(*item);
if(rrd_collector_running(rdcf->collector)) {
break;
}
else {
dictionary_acquired_item_release(host->functions, *item);
*item = NULL;
}
}
// if s == NULL, set it to the end of the buffer
// this should happen only the first time
@ -623,16 +647,131 @@ static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, s
buffer_flush(wb);
if(!(*rdcf))
return rrd_call_function_error(wb, "No collector is supplying this function on this host at this time.", HTTP_RESP_NOT_FOUND);
if(!(*rdcf)->collector->running)
return rrd_call_function_error(wb, "The collector that registered this function, is not currently running.", HTTP_RESP_SERVICE_UNAVAILABLE);
if(!(*item)) {
if(found)
return rrd_call_function_error(wb,
"The collector that registered this function, is not currently running.",
HTTP_RESP_SERVICE_UNAVAILABLE);
else
return rrd_call_function_error(wb,
"No collector is supplying this function on this host at this time.",
HTTP_RESP_NOT_FOUND);
}
return HTTP_RESP_OK;
}
static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) {
// ----------------------------------------------------------------------------
struct rrd_function_inflight {
bool used;
RRDHOST *host;
const char *transaction;
const char *cmd;
const char *sanitized_cmd;
size_t sanitized_cmd_length;
int timeout;
bool cancelled;
const DICTIONARY_ITEM *host_function_acquired;
// the collector
// we acquire this structure at the beginning,
// and we release it at the end
struct rrd_host_function *rdcf;
struct {
BUFFER *wb;
// in async mode,
// the function to call to send the result back
rrd_function_result_callback_t cb;
void *data;
} result;
struct {
// to be called in sync mode
// while the function is running
// to check if the function has been cancelled
rrd_function_is_cancelled_cb_t cb;
void *data;
} is_cancelled;
struct {
// to be registered by the function itself
// used to signal the function to cancel
rrd_function_canceller_cb_t cb;
void *data;
} canceller;
};
static DICTIONARY *rrd_functions_inflight_requests = NULL;
static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
struct rrd_function_inflight *r = value;
// internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction);
freez((void *)r->transaction);
freez((void *)r->cmd);
freez((void *)r->sanitized_cmd);
dictionary_acquired_item_release(r->host->functions, r->host_function_acquired);
}
void rrd_functions_inflight_init(void) {
if(rrd_functions_inflight_requests)
return;
rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight));
dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL);
}
void rrd_functions_inflight_destroy(void) {
if(!rrd_functions_inflight_requests)
return;
dictionary_destroy(rrd_functions_inflight_requests);
rrd_functions_inflight_requests = NULL;
}
static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_canceller_cb_t canceller_cb, void *canceller_cb_data) {
struct rrd_function_inflight *r = register_canceller_cb_data;
r->canceller.cb = canceller_cb;
r->canceller.data = canceller_cb_data;
}
// ----------------------------------------------------------------------------
// waiting for async function completion
struct rrd_function_call_wait {
RRDHOST *host;
const DICTIONARY_ITEM *host_function_acquired;
char *transaction;
bool free_with_signal;
bool data_are_ready;
netdata_mutex_t mutex;
pthread_cond_t cond;
int code;
};
static void rrd_inflight_function_cleanup(RRDHOST *host, const DICTIONARY_ITEM *host_function_acquired, const char *transaction) {
dictionary_del(rrd_functions_inflight_requests, transaction);
dictionary_garbage_collect(rrd_functions_inflight_requests);
}
static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) {
rrd_inflight_function_cleanup(tmp->host, tmp->host_function_acquired, tmp->transaction);
freez(tmp->transaction);
pthread_cond_destroy(&tmp->cond);
netdata_mutex_destroy(&tmp->mutex);
freez(tmp);
}
static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) {
struct rrd_function_call_wait *tmp = callback_data;
bool we_should_free = false;
@ -658,115 +797,300 @@ static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused,
}
}
int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name) {
int code;
static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) {
struct rrd_function_inflight *r = data;
struct rrd_collector_function *rdcf = NULL;
if(r->result.cb)
r->result.cb(wb, code, r->result.data);
char key[PLUGINSD_LINE_MAX + 1];
size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
code = rrd_call_function_find(host, wb, key, key_length, &rdcf);
if(code != HTTP_RESP_OK)
return code;
if(timeout <= 0)
timeout = rdcf->timeout;
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
tp.tv_sec += (time_t)timeout;
if(rdcf->sync) {
code = rdcf->function(wb, timeout, key, rdcf->collector_data, NULL, NULL);
}
else {
struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait));
tmp->free_with_signal = false;
tmp->data_are_ready = false;
netdata_mutex_init(&tmp->mutex);
pthread_cond_init(&tmp->cond, NULL);
bool we_should_free = true;
BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it
temp_wb->content_type = wb->content_type;
code = rdcf->function(temp_wb, timeout, key, rdcf->collector_data, rrd_call_function_signal_when_ready, tmp);
if (code == HTTP_RESP_OK) {
netdata_mutex_lock(&tmp->mutex);
int rc = 0;
while (rc == 0 && !tmp->data_are_ready) {
// the mutex is unlocked within pthread_cond_timedwait()
rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp);
// the mutex is again ours
}
if (tmp->data_are_ready) {
// we have a response
buffer_fast_strcat(wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb));
wb->content_type = temp_wb->content_type;
wb->expires = temp_wb->expires;
if(wb->expires)
buffer_cacheable(wb);
else
buffer_no_cacheable(wb);
code = tmp->code;
}
else if (rc == ETIMEDOUT) {
// timeout
// we will go away and let the callback free the structure
tmp->free_with_signal = true;
we_should_free = false;
code = rrd_call_function_error(wb, "Timeout while waiting for a response from the collector.", HTTP_RESP_GATEWAY_TIMEOUT);
}
else
code = rrd_call_function_error(wb, "Failed to get the response from the collector.", HTTP_RESP_INTERNAL_SERVER_ERROR);
netdata_mutex_unlock(&tmp->mutex);
}
else {
if(!buffer_strlen(wb))
rrd_call_function_error(wb, "Failed to send request to the collector.", code);
}
if (we_should_free) {
rrd_function_call_wait_free(tmp);
buffer_free(temp_wb);
}
}
return code;
rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction);
}
int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name,
rrd_call_function_async_callback callback, void *callback_data) {
int code;
static bool rrd_inflight_async_function_is_cancelled(void *data) {
struct rrd_function_inflight *r = data;
return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED);
}
struct rrd_collector_function *rdcf = NULL;
char key[PLUGINSD_LINE_MAX + 1];
size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
code = rrd_call_function_find(host, wb, key, key_length, &rdcf);
if(code != HTTP_RESP_OK)
return code;
if(timeout <= 0)
timeout = rdcf->timeout;
code = rdcf->function(wb, timeout, key, rdcf->collector_data, callback, callback_data);
static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) {
int code = r->rdcf->execute_cb(r->result.wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data,
rrd_inflight_async_function_nowait_finished, r,
rrd_inflight_async_function_is_cancelled, r,
rrd_inflight_async_function_register_canceller_cb, r);
if(code != HTTP_RESP_OK) {
if (!buffer_strlen(wb))
rrd_call_function_error(wb, "Failed to send request to the collector.", code);
if (!buffer_strlen(r->result.wb))
rrd_call_function_error(r->result.wb, "Failed to send request to the collector.", code);
rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction);
}
return code;
}
static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) {
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
usec_t now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC;
usec_t end_ut = now_ut + r->timeout * USEC_PER_SEC;
struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait));
tmp->free_with_signal = false;
tmp->data_are_ready = false;
tmp->host = r->host;
tmp->host_function_acquired = r->host_function_acquired;
tmp->transaction = strdupz(r->transaction);
netdata_mutex_init(&tmp->mutex);
pthread_cond_init(&tmp->cond, NULL);
// we need a temporary BUFFER, because we may time out and the caller supplied one may vanish
// so, we create a new one we guarantee will survive until the collector finishes...
bool we_should_free = true;
BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it
temp_wb->content_type = r->result.wb->content_type;
int code = r->rdcf->execute_cb(temp_wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data,
// we overwrite the result callbacks,
// so that we can clean up the allocations made
rrd_async_function_signal_when_ready, tmp,
rrd_inflight_async_function_is_cancelled, r,
rrd_inflight_async_function_register_canceller_cb, r);
if (code == HTTP_RESP_OK) {
netdata_mutex_lock(&tmp->mutex);
bool cancelled = false;
int rc = 0;
while (rc == 0 && !cancelled && !tmp->data_are_ready) {
clock_gettime(CLOCK_REALTIME, &tp);
now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC;
if(now_ut >= end_ut) {
rc = ETIMEDOUT;
break;
}
tp.tv_nsec += 10 * NSEC_PER_MSEC;
if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) {
tp.tv_sec++;
tp.tv_nsec -= 1 * NSEC_PER_SEC;
}
// the mutex is unlocked within pthread_cond_timedwait()
rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp);
// the mutex is again ours
if(rc == ETIMEDOUT) {
rc = 0;
if (!tmp->data_are_ready && r->is_cancelled.cb &&
r->is_cancelled.cb(r->is_cancelled.data)) {
// internal_error(true, "FUNCTIONS: transaction '%s' is cancelled while waiting for response",
// r->transaction);
rc = 0;
cancelled = true;
rrd_function_cancel(r->transaction);
break;
}
}
}
if (tmp->data_are_ready) {
// we have a response
buffer_fast_strcat(r->result.wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb));
r->result.wb->content_type = temp_wb->content_type;
r->result.wb->expires = temp_wb->expires;
if(r->result.wb->expires)
buffer_cacheable(r->result.wb);
else
buffer_no_cacheable(r->result.wb);
code = tmp->code;
}
else if (rc == ETIMEDOUT || cancelled) {
// timeout
// we will go away and let the callback free the structure
tmp->free_with_signal = true;
we_should_free = false;
if(cancelled)
code = rrd_call_function_error(r->result.wb,
"Request cancelled",
HTTP_RESP_CLIENT_CLOSED_REQUEST);
else
code = rrd_call_function_error(r->result.wb,
"Timeout while waiting for a response from the collector.",
HTTP_RESP_GATEWAY_TIMEOUT);
}
else
code = rrd_call_function_error(r->result.wb,
"Internal error while communicating with the collector",
HTTP_RESP_INTERNAL_SERVER_ERROR);
netdata_mutex_unlock(&tmp->mutex);
}
else {
if(!buffer_strlen(r->result.wb))
rrd_call_function_error(r->result.wb, "The collector returned an error.", code);
}
if (we_should_free) {
rrd_function_call_wait_free(tmp);
buffer_free(temp_wb);
}
return code;
}
static inline int rrd_call_function_async(struct rrd_function_inflight *r, bool wait) {
if(wait)
return rrd_call_function_async_and_wait(r);
else
return rrd_call_function_async_and_dont_wait(r);
}
// ----------------------------------------------------------------------------
int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd,
bool wait, const char *transaction,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data) {
int code;
char sanitized_cmd[PLUGINSD_LINE_MAX + 1];
const DICTIONARY_ITEM *host_function_acquired = NULL;
// ------------------------------------------------------------------------
// find the function
size_t sanitized_cmd_length = sanitize_function_text(sanitized_cmd, cmd, PLUGINSD_LINE_MAX);
code = rrd_call_function_find(host, result_wb, sanitized_cmd, sanitized_cmd_length, &host_function_acquired);
if(code != HTTP_RESP_OK)
return code;
struct rrd_host_function *rdcf = dictionary_acquired_item_value(host_function_acquired);
if(timeout <= 0)
timeout = rdcf->timeout;
// ------------------------------------------------------------------------
// the function can only be executed in sync mode
if(rdcf->sync) {
// the caller has to wait
code = rdcf->execute_cb(result_wb, timeout, sanitized_cmd, rdcf->execute_cb_data,
NULL, NULL, // no callback needed, it is synchronous
is_cancelled_cb, is_cancelled_cb_data, // it is ok to pass these, we block the caller
NULL, NULL); // no need to pass, we will wait
if (code != HTTP_RESP_OK && !buffer_strlen(result_wb))
rrd_call_function_error(result_wb, "Collector reported error.", code);
dictionary_acquired_item_release(host->functions, host_function_acquired);
return code;
}
// ------------------------------------------------------------------------
// the function can only be executed in async mode
// put the function into the inflight requests
char uuid_str[UUID_STR_LEN];
if(!transaction) {
uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower(uuid, uuid_str);
transaction = uuid_str;
}
// put the request into the inflight requests
struct rrd_function_inflight t = {
.used = false,
.host = host,
.cmd = strdupz(cmd),
.sanitized_cmd = strdupz(sanitized_cmd),
.sanitized_cmd_length = sanitized_cmd_length,
.transaction = strdupz(transaction),
.timeout = timeout,
.cancelled = false,
.host_function_acquired = host_function_acquired,
.rdcf = rdcf,
.result = {
.wb = result_wb,
.cb = result_cb,
.data = result_cb_data,
},
.is_cancelled = {
.cb = is_cancelled_cb,
.data = is_cancelled_cb_data,
}
};
struct rrd_function_inflight *r = dictionary_set(rrd_functions_inflight_requests, transaction, &t, sizeof(t));
if(r->used) {
netdata_log_info("FUNCTIONS: duplicate transaction '%s', function: '%s'", t.transaction, t.cmd);
code = rrd_call_function_error(result_wb, "duplicate transaction", HTTP_RESP_BAD_REQUEST);
freez((void *)t.transaction);
freez((void *)t.cmd);
freez((void *)t.sanitized_cmd);
dictionary_acquired_item_release(r->host->functions, t.host_function_acquired);
return code;
}
r->used = true;
// internal_error(true, "FUNCTIONS: transaction '%s' started", r->transaction);
return rrd_call_function_async(r, wait);
}
void rrd_function_cancel(const char *transaction) {
// internal_error(true, "FUNCTIONS: request to cancel transaction '%s'", transaction);
const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(rrd_functions_inflight_requests, transaction);
if(!item) {
netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the transaction is not running.",
transaction);
return;
}
struct rrd_function_inflight *r = dictionary_acquired_item_value(item);
bool cancelled = __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED);
if(cancelled) {
netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but it is already cancelled.",
transaction);
goto cleanup;
}
__atomic_store_n(&r->cancelled, true, __ATOMIC_RELAXED);
int32_t expected = __atomic_load_n(&r->rdcf->collector->refcount_canceller, __ATOMIC_RELAXED);
int32_t wanted;
do {
if(expected < 0) {
netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the collector is not running.",
transaction);
goto cleanup;
}
wanted = expected + 1;
} while(!__atomic_compare_exchange_n(&r->rdcf->collector->refcount_canceller, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
if(r->canceller.cb)
r->canceller.cb(r->canceller.data);
__atomic_sub_fetch(&r->rdcf->collector->refcount_canceller, 1, __ATOMIC_RELAXED);
cleanup:
dictionary_acquired_item_release(rrd_functions_inflight_requests, item);
}
// ----------------------------------------------------------------------------
static void functions2json(DICTIONARY *functions, BUFFER *wb, const char *ident, const char *kq, const char *sq) {
struct rrd_collector_function *t;
struct rrd_host_function *t;
dfe_start_read(functions, t) {
if(!t->collector->running) continue;
if(!rrd_collector_running(t->collector)) continue;
if(t_dfe.counter)
buffer_strcat(wb, ",\n");
@ -799,9 +1123,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) {
buffer_json_member_add_object(wb, "functions");
struct rrd_collector_function *t;
struct rrd_host_function *t;
dfe_start_read(host->functions, t) {
if(!t->collector->running) continue;
if(!rrd_collector_running(t->collector)) continue;
buffer_json_member_add_object(wb, t_dfe.name);
buffer_json_member_add_string(wb, "help", string2str(t->help));
@ -822,9 +1146,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) {
void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) {
if(!rrdset_functions_view || !dst) return;
struct rrd_collector_function *t;
struct rrd_host_function *t;
dfe_start_read(rrdset_functions_view, t) {
if(!t->collector->running) continue;
if(!rrd_collector_running(t->collector)) continue;
dictionary_set(dst, t_dfe.name, value, value_size);
}
@ -834,9 +1158,9 @@ void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst,
void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help) {
if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return;
struct rrd_collector_function *t;
struct rrd_host_function *t;
dfe_start_read(host->functions, t) {
if(!t->collector->running) continue;
if(!rrd_collector_running(t->collector)) continue;
if(help)
*help = t->help;
@ -846,10 +1170,14 @@ void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t
dfe_done(t);
}
// ----------------------------------------------------------------------------
int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
function_data_ready_callback callback __maybe_unused, void *callback_data __maybe_unused) {
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data) {
time_t now = now_realtime_sec();
buffer_flush(wb);
@ -1468,8 +1796,14 @@ int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const cha
buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + 1);
buffer_json_finalize(wb);
if(callback)
callback(wb, HTTP_RESP_OK, callback_data);
int response = HTTP_RESP_OK;
if(is_cancelled_cb && is_cancelled_cb(is_cancelled_cb_data)) {
buffer_flush(wb);
response = HTTP_RESP_CLIENT_CLOSED_REQUEST;
}
return HTTP_RESP_OK;
if(result_cb)
result_cb(wb, response, result_cb_data);
return response;
}

View file

@ -1,26 +1,39 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_RRDFUNCTIONS_H
#define NETDATA_RRDFUNCTIONS_H 1
// ----------------------------------------------------------------------------
#include "rrd.h"
void rrdfunctions_init(RRDHOST *host);
void rrdfunctions_destroy(RRDHOST *host);
typedef void (*rrd_function_result_callback_t)(BUFFER *wb, int code, void *result_cb_data);
typedef bool (*rrd_function_is_cancelled_cb_t)(void *is_cancelled_cb_data);
typedef void (*rrd_function_canceller_cb_t)(void *data);
typedef void (*rrd_function_register_canceller_cb_t)(void *register_cancel_cb_data, rrd_function_canceller_cb_t cancel_cb, void *cancel_cb_data);
typedef int (*rrd_function_execute_cb_t)(BUFFER *wb, int timeout, const char *function, void *collector_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data);
void rrd_functions_inflight_init(void);
void rrdfunctions_host_init(RRDHOST *host);
void rrdfunctions_host_destroy(RRDHOST *host);
void rrd_collector_started(void);
void rrd_collector_finished(void);
typedef void (*function_data_ready_callback)(BUFFER *wb, int code, void *callback_data);
// add a function, to be run from the collector
void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data);
typedef int (*function_execute_at_collector)(BUFFER *wb, int timeout, const char *function, void *collector_data,
function_data_ready_callback callback, void *callback_data);
// call a function, to be run from anywhere
int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd,
bool wait, const char *transaction,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data);
void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
bool sync, function_execute_at_collector function, void *collector_data);
int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name);
typedef void (*rrd_call_function_async_callback)(BUFFER *wb, int code, void *callback_data);
int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name, rrd_call_function_async_callback, void *callback_data);
// cancel a running function, to be run from anywhere
void rrd_function_cancel(const char *transaction);
void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb);
void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb);
@ -35,7 +48,9 @@ const char *functions_content_type_to_format(HTTP_CONTENT_TYPE content_type);
int rrd_call_function_error(BUFFER *wb, const char *msg, int code);
int rrdhost_function_streaming(BUFFER *wb, int timeout, const char *function, void *collector_data,
function_data_ready_callback callback, void *callback_data);
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_cancel_cb, void *register_cancel_db_data);
#define RRDFUNCTIONS_STREAMING_HELP "Streaming status for parents and children."

View file

@ -335,7 +335,7 @@ int is_legacy = 1;
netdata_mutex_init(&host->receiver_lock);
if (likely(!archived)) {
rrdfunctions_init(host);
rrdfunctions_host_init(host);
host->rrdlabels = rrdlabels_create();
rrdhost_initialize_rrdpush_sender(
host, rrdpush_enabled, rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching);
@ -665,7 +665,7 @@ static void rrdhost_update(RRDHOST *host
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ARCHIVED);
rrdfunctions_init(host);
rrdfunctions_host_init(host);
if(!host->rrdlabels)
host->rrdlabels = rrdlabels_create();
@ -1070,9 +1070,9 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt
// we register this only on localhost
// for the other nodes, the origin server should register it
rrd_collector_started(); // this creates a collector that runs for as long as netdata runs
rrd_collector_add_function(localhost, NULL, "streaming", 10,
RRDFUNCTIONS_STREAMING_HELP, true,
rrdhost_function_streaming, NULL);
rrd_function_add(localhost, NULL, "streaming", 10,
RRDFUNCTIONS_STREAMING_HELP, true,
rrdhost_function_streaming, NULL);
#endif
if (likely(system_info)) {
@ -1160,9 +1160,11 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host)
rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread
cbuffer_free(host->sender->buffer);
#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_compressor_destroy(&host->sender->compressor);
#endif
replication_cleanup_sender(host->sender);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(*host->sender), __ATOMIC_RELAXED);
@ -1266,7 +1268,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) {
freez(host->node_id);
rrdfamily_index_destroy(host);
rrdfunctions_destroy(host);
rrdfunctions_host_destroy(host);
rrdvariables_destroy(host->rrdvars);
if (host == localhost)
rrdvariables_destroy(health_rrdvars);

View file

@ -15,6 +15,7 @@ SUBDIRS = \
ebpf \
eval \
facets \
functions_evloop \
json \
july \
health \

View file

@ -249,6 +249,17 @@ static inline void buffer_strcat(BUFFER *wb, const char *txt) {
buffer_overflow_check(wb);
}
static inline void buffer_contents_replace(BUFFER *wb, const char *txt, size_t len) {
wb->len = 0;
buffer_need_bytes(wb, len + 1);
memcpy(wb->buffer, txt, len);
wb->len = len;
wb->buffer[wb->len] = '\0';
buffer_overflow_check(wb);
}
static inline void buffer_strncat(BUFFER *wb, const char *txt, size_t len) {
if(unlikely(!txt || !*txt)) return;

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,5 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef FACETS_H
#define FACETS_H 1
@ -46,9 +48,6 @@ typedef struct facet_row {
typedef struct facets FACETS;
typedef struct facet_key FACET_KEY;
#define FACET_STRING_HASH_SIZE 23
void facets_string_hash(const char *src, size_t len, char *out);
typedef void (*facets_key_transformer_t)(FACETS *facets __maybe_unused, BUFFER *wb, void *data);
typedef void (*facet_dynamic_row_t)(FACETS *facets, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data);
FACET_KEY *facets_register_dynamic_key_name(FACETS *facets, const char *key, FACET_KEY_OPTIONS options, facet_dynamic_row_t cb, void *data);
@ -75,8 +74,9 @@ void facets_set_query(FACETS *facets, const char *query);
void facets_set_items(FACETS *facets, uint32_t items);
void facets_set_anchor(FACETS *facets, usec_t anchor, FACETS_ANCHOR_DIRECTION direction);
FACET_KEY *facets_register_facet_id(FACETS *facets, const char *key_id, FACET_KEY_OPTIONS options);
void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_ids, FACET_KEY_OPTIONS options);
void facets_set_histogram(FACETS *facets, const char *chart, usec_t after_ut, usec_t before_ut);
void facets_register_facet_id_filter(FACETS *facets, const char *key_id, char *value_id, FACET_KEY_OPTIONS options);
void facets_set_histogram_by_id(FACETS *facets, const char *key_id, usec_t after_ut, usec_t before_ut);
void facets_set_histogram_by_name(FACETS *facets, const char *key_name, usec_t after_ut, usec_t before_ut);
void facets_add_key_value(FACETS *facets, const char *key, const char *value);
void facets_add_key_value_length(FACETS *facets, const char *key, size_t key_len, const char *value, size_t value_len);

View file

@ -0,0 +1,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
dist_noinst_DATA = \
README.md \
$(NULL)

View file

View file

@ -0,0 +1,207 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "functions_evloop.h"
#define MAX_FUNCTION_PARAMETERS 1024
struct functions_evloop_worker_job {
bool used;
bool running;
bool cancelled;
char *cmd;
const char *transaction;
time_t timeout;
functions_evloop_worker_execute_t cb;
};
struct rrd_functions_expectation {
const char *function;
size_t function_length;
functions_evloop_worker_execute_t cb;
time_t default_timeout;
struct rrd_functions_expectation *prev, *next;
};
struct functions_evloop_globals {
const char *tag;
DICTIONARY *worker_queue;
pthread_mutex_t worker_mutex;
pthread_cond_t worker_cond_var;
size_t workers;
netdata_mutex_t *stdout_mutex;
bool *plugin_should_exit;
netdata_thread_t reader_thread;
netdata_thread_t *worker_threads;
struct rrd_functions_expectation *expectations;
};
static void *rrd_functions_worker_globals_worker_main(void *arg) {
struct functions_evloop_globals *wg = arg;
while (true) {
pthread_mutex_lock(&wg->worker_mutex);
while (dictionary_entries(wg->worker_queue) == 0) {
pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
}
const DICTIONARY_ITEM *acquired = NULL;
struct functions_evloop_worker_job *j;
dfe_start_write(wg->worker_queue, j) {
if(j->running || j->cancelled)
continue;
acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
j->running = true;
break;
}
dfe_done(j);
pthread_mutex_unlock(&wg->worker_mutex);
if(acquired) {
j = dictionary_acquired_item_value(acquired);
j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
dictionary_del(wg->worker_queue, j->transaction);
dictionary_acquired_item_release(wg->worker_queue, acquired);
dictionary_garbage_collect(wg->worker_queue);
}
}
return NULL;
}
static void *rrd_functions_worker_globals_reader_main(void *arg) {
struct functions_evloop_globals *wg = arg;
char buffer[PLUGINSD_LINE_MAX + 1];
char *s = NULL;
while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
}
else {
int timeout = str2i(timeout_s);
bool found = false;
struct rrd_functions_expectation *we;
for(we = wg->expectations; we ;we = we->next) {
if(strncmp(function, we->function, we->function_length) == 0) {
struct functions_evloop_worker_job t = {
.cmd = strdupz(function),
.transaction = strdupz(transaction),
.running = false,
.cancelled = false,
.timeout = timeout > 0 ? timeout : we->default_timeout,
.used = false,
.cb = we->cb,
};
struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
if(j->used) {
netdata_log_error("Received duplicate function transaction '%s'", transaction);
freez((void *)t.cmd);
freez((void *)t.transaction);
}
else {
found = true;
j->used = true;
pthread_cond_signal(&wg->worker_cond_var);
}
}
}
if(!found) {
netdata_mutex_lock(wg->stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found.");
netdata_mutex_unlock(wg->stdout_mutex);
}
}
}
else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
char *transaction = get_word(words, num_words, 1);
const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
if(acquired) {
struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
__atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
dictionary_acquired_item_release(wg->worker_queue, acquired);
dictionary_del(wg->worker_queue, transaction);
dictionary_garbage_collect(wg->worker_queue);
}
else
netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
}
else
netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
}
if(!s || feof(stdin) || ferror(stdin)) {
*wg->plugin_should_exit = true;
netdata_log_error("Received error on stdin.");
}
exit(1);
}
void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
struct functions_evloop_worker_job *j = value;
freez((void *)j->cmd);
freez((void *)j->transaction);
}
struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
pthread_mutex_init(&wg->worker_mutex, NULL);
pthread_cond_init(&wg->worker_cond_var, NULL);
wg->plugin_should_exit = plugin_should_exit;
wg->stdout_mutex = stdout_mutex;
wg->workers = worker_threads;
wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
wg->tag = tag;
char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
rrd_functions_worker_globals_reader_main, wg);
for(size_t i = 0; i < wg->workers ; i++) {
snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
rrd_functions_worker_globals_worker_main, wg);
}
return wg;
}
void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
we->function = function;
we->function_length = strlen(we->function);
we->cb = cb;
we->default_timeout = default_timeout;
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
}

View file

@ -0,0 +1,98 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_FUNCTIONS_EVLOOP_H
#define NETDATA_FUNCTIONS_EVLOOP_H
#include "../libnetdata.h"
#define PLUGINSD_KEYWORD_CHART "CHART"
#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
#define PLUGINSD_KEYWORD_SET "SET"
#define PLUGINSD_KEYWORD_END "END"
#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
#define PLUGINSD_KEYWORD_LABEL "LABEL"
#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION"
#define PLUGINSD_KEYWORD_FUNCTION_CANCEL "FUNCTION_CANCEL"
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN"
#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END"
#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
#define PLUGINSD_KEYWORD_REPLAY_END "REND"
#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
#define PLUGINSD_KEYWORD_SET_V2 "SET2"
#define PLUGINSD_KEYWORD_END_V2 "END2"
#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
#define PLUGINSD_KEYWORD_HOST "HOST"
#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
#define PLUGINSD_KEYWORD_EXIT "EXIT"
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, int timeout, bool *cancelled);
struct functions_evloop_worker_job;
struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit);
void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout);
#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
buffer_sprintf(wb \
, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
, (transaction) ? (transaction) : "" \
, (int)(code) \
, (content_type) ? (content_type) : "" \
, (long int)(expires) \
)
#define pluginsd_function_result_end_to_buffer(wb) \
buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
fprintf(stdout \
, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
, (transaction) ? (transaction) : "" \
, (int)(code) \
, (content_type) ? (content_type) : "" \
, (long int)(expires) \
)
#define pluginsd_function_result_end_to_stdout() \
fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
char buffer[PLUGINSD_LINE_MAX + 1];
json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}
static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}
#endif //NETDATA_FUNCTIONS_EVLOOP_H

View file

@ -394,10 +394,10 @@ static inline NETDATA_DOUBLE str2ndd_encoded(const char *src, char **endptr) {
return str2ndd(src, endptr) * sign;
}
static inline char *strncpyz(char *dst, const char *src, size_t n) {
static inline char *strncpyz(char *dst, const char *src, size_t dst_size_minus_1) {
char *p = dst;
while (*src && n--)
while (*src && dst_size_minus_1--)
*dst++ = *src++;
*dst = '\0';

View file

@ -842,6 +842,7 @@ extern char *netdata_configured_host_prefix;
#include "gorilla/gorilla.h"
#include "facets/facets.h"
#include "dyn_conf/dyn_conf.h"
#include "functions_evloop/functions_evloop.h"
// BEWARE: this exists in alarm-notify.sh
#define DEFAULT_CLOUD_BASE_URL "https://app.netdata.cloud"

View file

@ -6,6 +6,7 @@
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
#include "web/server/web_client.h"
#include "database/rrdfunctions.h"
#include "database/rrd.h"
#define CONNECTED_TO_SIZE 100

View file

@ -940,6 +940,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
buffer_strlen(func_wb),
now_realtime_usec() - tmp->received_ut);
}
string_freez(tmp->transaction);
buffer_free(func_wb);
freez(tmp);
@ -989,7 +990,9 @@ void execute_commands(struct sender_state *s) {
tmp->transaction = string_strdupz(transaction);
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
stream_execute_function_callback, tmp, NULL, NULL);
if(code != HTTP_RESP_OK) {
if (!buffer_strlen(wb))
rrd_call_function_error(wb, "Failed to route request to collector", code);
@ -998,6 +1001,13 @@ void execute_commands(struct sender_state *s) {
}
}
}
else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
char *transaction = get_word(words, num_words, 1);
if(transaction && *transaction)
rrd_function_cancel(transaction);
}
else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);

View file

@ -1412,7 +1412,9 @@ int web_client_api_request_v1_function(RRDHOST *host, struct web_client *w, char
wb->content_type = CT_APPLICATION_JSON;
buffer_no_cacheable(wb);
return rrd_call_function_and_wait(host, wb, timeout, function);
return rrd_function_run(host, wb, timeout, function, true, NULL,
NULL, NULL,
web_client_interrupt_callback, w);
}
int web_client_api_request_v1_functions(RRDHOST *host, struct web_client *w, char *url __maybe_unused) {

View file

@ -370,7 +370,7 @@ static int web_server_snd_callback(POLLINFO *pi, short int *events) {
netdata_log_debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
int ret = web_client_send(w);
ssize_t ret = web_client_send(w);
if(unlikely(ret < 0)) {
retval = -1;