0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-12 16:58:10 +00:00
netdata_netdata/libnetdata/functions_evloop/functions_evloop.c
Costa Tsaousis 3e508c8f95
New logging layer ()
* cleanup of logging - wip

* first working iteration

* add errno annotator

* replace old logging functions with netdata_logger()

* cleanup

* update error_limit

* fix remanining error_limit references

* work on fatal()

* started working on structured logs

* full cleanup

* default logging to files; fix all plugins initialization

* fix formatting of numbers

* cleanup and reorg

* fix coverity issues

* cleanup obsolete code

* fix formatting of numbers

* fix log rotation

* fix for older systems

* add detection of systemd journal via stderr

* finished on access.log

* remove left-over transport

* do not add empty fields to the logs

* journal get compact uuids; X-Transaction-ID header is added in web responses

* allow compiling on systems without memfd sealing

* added libnetdata/uuid directory

* move datetime formatters to libnetdata

* add missing files

* link the makefiles in libnetdata

* added uuid_parse_flexi() to parse UUIDs with and without hyphens; the web server now read X-Transaction-ID and uses it for functions and web responses

* added stream receiver, sender, proc plugin and pluginsd log stack

* iso8601 advanced usage; line_splitter module in libnetdata; code cleanup

* add message ids to streaming inbound and outbound connections

* cleanup line_splitter between lines to avoid logging garbage; when killing children, kill them with SIGABRT if internal checks is enabled

* send SIGABRT to external plugins only if we are not shutting down

* fix cross cleanup in pluginsd parser

* fatal when there is a stack error in logs

* compile netdata with -fexceptions

* do not kill external plugins with SIGABRT

* metasync info logs to debug level

* added severity to logs

* added json output; added options per log output; added documentation; fixed issues mentioned

* allow memfd only on linux

* moved journal low level functions to journal.c/h

* move health logs to daemon.log with proper priorities

* fixed a couple of bugs; health log in journal

* updated docs

* systemd-cat-native command to push structured logs to journal from the command line

* fix makefiles

* restored NETDATA_LOG_SEVERITY_LEVEL

* fix makefiles

* systemd-cat-native can also work as the logger of Netdata scripts

* do not require a socket to systemd-journal to log-as-netdata

* alarm notify logs in native format

* properly compare log ids

* fatals log alerts; alarm-notify.sh working

* fix overflow warning

* alarm-notify.sh now logs the request (command line)

* anotate external plugins logs with the function cmd they run

* added context, component and type to alarm-notify.sh; shell sanitization removes control character and characters that may be expanded by bash

* reformatted alarm-notify logs

* unify cgroup-network-helper.sh

* added quotes around params

* charts.d.plugin switched logging to journal native

* quotes for logfmt

* unify the status codes of streaming receivers and senders

* alarm-notify: dont log anything, if there is nothing to do

* all external plugins log to stderr when running outside netdata; alarm-notify now shows an error when notifications menthod are needed but are not available

* migrate cgroup-name.sh to new logging

* systemd-cat-native now supports messages with newlines

* socket.c logs use priority

* cleanup log field types

* inherit the systemd set INVOCATION_ID if found

* allow systemd-cat-native to send messages to a systemd-journal-remote URL

* log2journal command that can convert structured logs to journal export format

* various fixes and documentation of log2journal

* updated log2journal docs

* updated log2journal docs

* updated documentation of fields

* allow compiling without libcurl

* do not use socket as format string

* added version information to newly added tools

* updated documentation and help messages

* fix the namespace socket path

* print errno with error

* do not timeout

* updated docs

* updated docs

* updated docs

* log2journal updated docs and params

* when talking to a remote journal, systemd-cat-native batches the messages

* enable lz4 compression for systemd-cat-native when sending messages to a systemd-journal-remote

* Revert "enable lz4 compression for systemd-cat-native when sending messages to a systemd-journal-remote"

This reverts commit b079d53c11.

* note about uncompressed traffic

* log2journal: code reorg and cleanup to make modular

* finished rewriting log2journal

* more comments

* rewriting rules support

* increased limits

* updated docs

* updated docs

* fix old log call

* use journal only when stderr is connected to journal

* update netdata.spec for libcurl, libpcre2 and log2journal

* pcre2-devel

* do not require pcre2 in centos < 8, amazonlinux < 2023, open suse

* log2journal only on systems pcre2 is available

* ignore log2journal in .gitignore

* avoid log2journal on centos 7, amazonlinux 2 and opensuse

* add pcre2-8 to static build

* undo last commit

* Bundle to static

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

* Add build deps for deb packages

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

* Add dependencies; build from source

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

* Test build for amazon linux and centos expect to fail for suse

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

* fix minor oversight

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

* Reorg code

* Add the install from source (deps) as a TODO
* Not enable the build on suse ecosystem

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>

---------

Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud>
Co-authored-by: Tasos Katsoulas <tasos@netdata.cloud>
2023-11-22 10:27:25 +02:00

216 lines
8.3 KiB
C

// SPDX-License-Identifier: GPL-3.0-or-later
#include "functions_evloop.h"
#define MAX_FUNCTION_PARAMETERS 1024
struct functions_evloop_worker_job {
bool used;
bool running;
bool cancelled;
char *cmd;
const char *transaction;
time_t timeout;
functions_evloop_worker_execute_t cb;
};
struct rrd_functions_expectation {
const char *function;
size_t function_length;
functions_evloop_worker_execute_t cb;
time_t default_timeout;
struct rrd_functions_expectation *prev, *next;
};
struct functions_evloop_globals {
const char *tag;
DICTIONARY *worker_queue;
pthread_mutex_t worker_mutex;
pthread_cond_t worker_cond_var;
size_t workers;
netdata_mutex_t *stdout_mutex;
bool *plugin_should_exit;
netdata_thread_t reader_thread;
netdata_thread_t *worker_threads;
struct rrd_functions_expectation *expectations;
};
static void *rrd_functions_worker_globals_worker_main(void *arg) {
struct functions_evloop_globals *wg = arg;
bool last_acquired = true;
while (true) {
pthread_mutex_lock(&wg->worker_mutex);
if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
const DICTIONARY_ITEM *acquired = NULL;
struct functions_evloop_worker_job *j;
dfe_start_write(wg->worker_queue, j) {
if(j->running || j->cancelled)
continue;
acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
j->running = true;
break;
}
dfe_done(j);
pthread_mutex_unlock(&wg->worker_mutex);
if(acquired) {
ND_LOG_STACK lgs[] = {
ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
ND_LOG_FIELD_END(),
};
ND_LOG_STACK_PUSH(lgs);
last_acquired = true;
j = dictionary_acquired_item_value(acquired);
j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
dictionary_del(wg->worker_queue, j->transaction);
dictionary_acquired_item_release(wg->worker_queue, acquired);
dictionary_garbage_collect(wg->worker_queue);
}
else
last_acquired = false;
}
return NULL;
}
static void *rrd_functions_worker_globals_reader_main(void *arg) {
struct functions_evloop_globals *wg = arg;
char buffer[PLUGINSD_LINE_MAX + 1];
char *s = NULL;
while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
}
else {
int timeout = str2i(timeout_s);
bool found = false;
struct rrd_functions_expectation *we;
for(we = wg->expectations; we ;we = we->next) {
if(strncmp(function, we->function, we->function_length) == 0) {
struct functions_evloop_worker_job t = {
.cmd = strdupz(function),
.transaction = strdupz(transaction),
.running = false,
.cancelled = false,
.timeout = timeout > 0 ? timeout : we->default_timeout,
.used = false,
.cb = we->cb,
};
struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
if(j->used) {
netdata_log_error("Received duplicate function transaction '%s'", transaction);
freez((void *)t.cmd);
freez((void *)t.transaction);
}
else {
found = true;
j->used = true;
pthread_cond_signal(&wg->worker_cond_var);
}
}
}
if(!found) {
netdata_mutex_lock(wg->stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found.");
netdata_mutex_unlock(wg->stdout_mutex);
}
}
}
else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
char *transaction = get_word(words, num_words, 1);
const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
if(acquired) {
struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
__atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
dictionary_acquired_item_release(wg->worker_queue, acquired);
dictionary_del(wg->worker_queue, transaction);
dictionary_garbage_collect(wg->worker_queue);
}
else
netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
}
else
netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
}
if(!s || feof(stdin) || ferror(stdin)) {
*wg->plugin_should_exit = true;
netdata_log_error("Received error on stdin.");
}
exit(1);
}
void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
struct functions_evloop_worker_job *j = value;
freez((void *)j->cmd);
freez((void *)j->transaction);
}
struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
pthread_mutex_init(&wg->worker_mutex, NULL);
pthread_cond_init(&wg->worker_cond_var, NULL);
wg->plugin_should_exit = plugin_should_exit;
wg->stdout_mutex = stdout_mutex;
wg->workers = worker_threads;
wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
wg->tag = tag;
char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
rrd_functions_worker_globals_reader_main, wg);
for(size_t i = 0; i < wg->workers ; i++) {
snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
rrd_functions_worker_globals_worker_main, wg);
}
return wg;
}
void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
we->function = function;
we->function_length = strlen(we->function);
we->cb = cb;
we->default_timeout = default_timeout;
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
}