0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-06 14:35:32 +00:00

[WIP] Windows-Events Logs Explorer ()

* preparations

* work on systemd-journal to abstract the components we need to use in windows events logs

* more code reorg to reuse systemd journal

* abstracted logs query parsing and preparation

* fix help printing

* moved non-LQS fields outside logs_query_status.h

* code re-organization and cleanup

* move lqs cleanup to lqs

* code re-organization and cleanup

* support accurate and approximate data sizes printing

* support for printing durations with spaces, for clarity

* progress on windows events sources

* windows events sources, with a count of logs

* windows events sources, with duration

* windows events sources, with file size

* support for converting between unix and windows timestamps

* fixed conditions

* internal api for querying windows event logs

* extract basic fields from windows events

* fix sid and guid

* users lookup with caching

* basic structure for the netdata plugin

* decode more message fields

* event log sources are now periodically scanned

* progress on building the query for windows events

* first working version

* support external plugins with .plugin.exe suffix

* fix compilation warnings

* fix thread safety

* fixes in severity and buffers used

* match all rows and do not pass empty data to facets

* keyword should be hex

* strict type checking and more fields, including the full XML view

* restructure the code to keep caches between channel queries

* format a message using the XML event data

* message is now formatted as a dynamic field at presentation time

* extracted UserData and applied basic XML formatting on the message

* properly intent nested nodes
This commit is contained in:
Costa Tsaousis 2024-09-10 20:03:54 +03:00 committed by GitHub
parent a6fea21303
commit 2c9bca448e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 4654 additions and 1489 deletions

View file

@ -789,6 +789,9 @@ set(LIBNETDATA_FILES
src/libnetdata/config/appconfig_api_text.h
src/libnetdata/config/appconfig_api_boolean.c
src/libnetdata/config/appconfig_api_boolean.h
src/libnetdata/facets/logs_query_status.h
src/libnetdata/os/timestamps.c
src/libnetdata/os/timestamps.h
)
if(ENABLE_PLUGIN_EBPF)
@ -1271,6 +1274,7 @@ set(SYSTEMD_JOURNAL_PLUGIN_FILES
src/libnetdata/maps/system-users.h
src/libnetdata/maps/system-groups.h
src/libnetdata/maps/system-services.h
src/collectors/systemd-journal.plugin/systemd-journal-sampling.h
)
set(STREAMING_PLUGIN_FILES
@ -1393,6 +1397,21 @@ set(FREEBSD_PLUGIN_FILES
src/collectors/proc.plugin/zfs_common.h
)
set(WINDOWS_EVENTS_PLUGIN_FILES
src/collectors/windows-events.plugin/windows-events.c
src/collectors/windows-events.plugin/windows-events.h
src/collectors/windows-events.plugin/windows-events-query.h
src/collectors/windows-events.plugin/windows-events-query.c
src/collectors/windows-events.plugin/windows-events-sources.c
src/collectors/windows-events.plugin/windows-events-sources.h
src/collectors/windows-events.plugin/windows-events-sid.c
src/collectors/windows-events.plugin/windows-events-sid.h
src/collectors/windows-events.plugin/windows-events-unicode.c
src/collectors/windows-events.plugin/windows-events-unicode.h
src/collectors/windows-events.plugin/windows-events-xml.c
src/collectors/windows-events.plugin/windows-events-xml.h
)
set(WINDOWS_PLUGIN_FILES
src/collectors/windows.plugin/windows_plugin.c
src/collectors/windows.plugin/windows_plugin.h
@ -2081,6 +2100,15 @@ if(ENABLE_PLUGIN_SYSTEMD_JOURNAL)
endif()
endif()
if(OS_WINDOWS)
add_executable(windows-events.plugin ${WINDOWS_EVENTS_PLUGIN_FILES})
target_link_libraries(windows-events.plugin libnetdata wevtapi)
install(TARGETS windows-events.plugin
COMPONENT plugin-windows-events
DESTINATION usr/libexec/netdata/plugins.d)
endif()
if(ENABLE_PLUGIN_EBPF)
set(EBPF_PLUGIN_FILES
src/collectors/ebpf.plugin/ebpf.c

View file

@ -51,28 +51,33 @@ fi
set -exu -o pipefail
if [ -d "${build}" ]
if [ ! -d "${build}" ]
then
rm -rf "${build}"
/usr/bin/cmake -S "${WT_ROOT}" -B "${build}" \
-G Ninja \
-DCMAKE_INSTALL_PREFIX="/opt/netdata" \
-DCMAKE_BUILD_TYPE="${BUILD_TYPE}" \
-DCMAKE_C_FLAGS="-fstack-protector-all -O0 -ggdb -Wall -Wextra -Wno-char-subscripts -Wa,-mbig-obj -pipe -DNETDATA_INTERNAL_CHECKS=1 -D_FILE_OFFSET_BITS=64 -D__USE_MINGW_ANSI_STDIO=1" \
-DBUILD_FOR_PACKAGING=${BUILD_FOR_PACKAGING} \
-DUSE_MOLD=Off \
-DNETDATA_USER="${USER}" \
-DDEFAULT_FEATURE_STATE=Off \
-DENABLE_H2O=Off \
-DENABLE_ML=On \
-DENABLE_BUNDLED_JSONC=On \
-DENABLE_BUNDLED_PROTOBUF=Off \
${NULL}
fi
/usr/bin/cmake -S "${WT_ROOT}" -B "${build}" \
-G Ninja \
-DCMAKE_INSTALL_PREFIX="/opt/netdata" \
-DCMAKE_BUILD_TYPE="${BUILD_TYPE}" \
-DCMAKE_C_FLAGS="-fstack-protector-all -O0 -ggdb -Wall -Wextra -Wno-char-subscripts -Wa,-mbig-obj -pipe -DNETDATA_INTERNAL_CHECKS=1 -D_FILE_OFFSET_BITS=64 -D__USE_MINGW_ANSI_STDIO=1" \
-DBUILD_FOR_PACKAGING=${BUILD_FOR_PACKAGING} \
-DUSE_MOLD=Off \
-DNETDATA_USER="${USER}" \
-DDEFAULT_FEATURE_STATE=Off \
-DENABLE_H2O=Off \
-DENABLE_ML=On \
-DENABLE_BUNDLED_JSONC=On \
-DENABLE_BUNDLED_PROTOBUF=Off \
${NULL}
ninja -v -C "${build}" install || ninja -v -C "${build}" -j 1
echo
echo "Compile with:"
echo "ninja -v -C \"${build}\" install || ninja -v -C \"${build}\" -j 1"
#echo
#echo "Compile with:"
#echo "ninja -v -C \"${build}\" install || ninja -v -C \"${build}\" -j 1"
echo "starting netdata..."
# enable JIT debug with gdb
export MSYS="error_start:$(cygpath -w /usr/bin/gdb)"
rm -rf /opt/netdata/var/log/netdata/*.log || echo
/opt/netdata/usr/bin/netdata -D

View file

@ -45,7 +45,10 @@ static void apps_plugin_function_processes_help(const char *transaction) {
"Filters can be combined. Each filter can be given only one time.\n"
);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_TEXT_PLAIN;
wb->expires = now_realtime_sec() + 3600;
pluginsd_function_result_to_stdout(transaction, wb);
buffer_free(wb);
}
@ -922,7 +925,10 @@ close_and_send:
buffer_json_member_add_time_t(wb, "expires", now_s + update_every);
buffer_json_finalize(wb);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_s + update_every, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_APPLICATION_JSON;
wb->expires = now_s + update_every;
pluginsd_function_result_to_stdout(transaction, wb);
buffer_free(wb);
}

View file

@ -1629,7 +1629,10 @@ close_and_send:
buffer_json_member_add_time_t(wb, "expires", now_s + update_every);
buffer_json_finalize(wb);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_s + update_every, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_APPLICATION_JSON;
wb->expires = now_s + update_every;
pluginsd_function_result_to_stdout(transaction, wb);
buffer_free(wb);
}
@ -1637,7 +1640,6 @@ close_and_send:
// ----------------------------------------------------------------------------
// main, command line arguments parsing
static void plugin_exit(int code) NORETURN;
static void plugin_exit(int code) {
fflush(stdout);
function_plugin_should_exit = true;
@ -2042,11 +2044,13 @@ int main (int argc, char **argv) {
collector_error("%s(): sensors failed to initialize. Calling DISABLE.", __FUNCTION__);
fprintf(stdout, "DISABLE\n");
plugin_exit(0);
break;
case ICS_FAILED:
collector_error("%s(): sensors fails repeatedly to collect metrics. Exiting to restart.", __FUNCTION__);
fprintf(stdout, "EXIT\n");
plugin_exit(0);
break;
}
if(netdata_do_sel) {

View file

@ -945,7 +945,10 @@ close_and_send:
buffer_json_finalize(wb);
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_s + 1, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_APPLICATION_JSON;
wb->expires = now_s + 1;
pluginsd_function_result_to_stdout(transaction, wb);
netdata_mutex_unlock(&stdout_mutex);
}

View file

@ -186,8 +186,8 @@ static void *pluginsd_worker_thread(void *arg) {
spawn_popen_write_fd(cd->unsafe.pi),
0);
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
nd_log(NDLS_COLLECTORS, NDLP_WARNING,
"PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections.",
rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
int worker_ret_code = spawn_popen_kill(cd->unsafe.pi);
@ -231,6 +231,33 @@ static void pluginsd_main_cleanup(void *pptr) {
worker_unregister();
}
static bool is_plugin(char *dst, size_t dst_size, const char *filename) {
size_t len = strlen(filename);
const char *suffix;
size_t suffix_len;
suffix = ".plugin";
suffix_len = strlen(suffix);
if (len > suffix_len &&
strcmp(suffix, &filename[len - suffix_len]) == 0) {
snprintfz(dst, dst_size, "%.*s", (int)(len - suffix_len), filename);
return true;
}
#if defined(OS_WINDOWS)
suffix = ".plugin.exe";
suffix_len = strlen(suffix);
if (len > suffix_len &&
strcmp(suffix, &filename[len - suffix_len]) == 0) {
snprintfz(dst, dst_size, "%.*s", (int)(len - suffix_len), filename);
return true;
}
#endif
return false;
}
void *pluginsd_main(void *ptr) {
CLEANUP_FUNCTION_REGISTER(pluginsd_main_cleanup) cleanup_ptr = ptr;
@ -279,18 +306,13 @@ void *pluginsd_main(void *ptr) {
if (unlikely(strcmp(file->d_name, ".") == 0 || strcmp(file->d_name, "..") == 0))
continue;
int len = (int)strlen(file->d_name);
if (unlikely(len <= (int)PLUGINSD_FILE_SUFFIX_LEN))
continue;
if (unlikely(strcmp(PLUGINSD_FILE_SUFFIX, &file->d_name[len - (int)PLUGINSD_FILE_SUFFIX_LEN]) != 0)) {
netdata_log_debug(D_PLUGINSD, "file '%s' does not end in '%s'", file->d_name, PLUGINSD_FILE_SUFFIX);
char pluginname[CONFIG_MAX_NAME + 1];
if(!is_plugin(pluginname, sizeof(pluginname), file->d_name)) {
netdata_log_debug(D_PLUGINSD, "file '%s' does not look like a plugin", file->d_name);
continue;
}
char pluginname[CONFIG_MAX_NAME + 1];
snprintfz(pluginname, CONFIG_MAX_NAME, "%.*s", (int)(len - PLUGINSD_FILE_SUFFIX_LEN), file->d_name);
int enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, pluginname, automatic_run);
if (unlikely(!enabled)) {
netdata_log_debug(D_PLUGINSD, "plugin '%s' is not enabled", file->d_name);
continue;

View file

@ -5,9 +5,7 @@
#include "daemon/common.h"
#define PLUGINSD_FILE_SUFFIX ".plugin"
#define PLUGINSD_FILE_SUFFIX_LEN strlen(PLUGINSD_FILE_SUFFIX)
#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
#define PLUGINSD_MAX_DIRECTORIES 20

View file

@ -1215,8 +1215,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, int fd_input,
&parser->reader, parser->fd_input,
2 * 60 * MSEC_PER_SEC, true);
if(unlikely(ret != BUFFERED_READER_READ_OK))
if(unlikely(ret != BUFFERED_READER_READ_OK)) {
nd_log(NDLS_COLLECTORS, NDLP_INFO, "Buffered reader not OK");
break;
}
continue;
}

View file

@ -475,19 +475,6 @@ struct journal_file_source {
uint64_t size;
};
static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) {
if(size > 1024ULL * 1024 * 1024 * 1024)
snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0);
else if(size > 1024ULL * 1024 * 1024)
snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0);
else if(size > 1024ULL * 1024)
snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0);
else if(size > 1024ULL)
snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0);
else
snprintfz(dst, dst_len, "%"PRIu64" B", size);
}
#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \
if((remaining) > (duration)) { \
uint64_t _count = (remaining) / (duration); \
@ -498,22 +485,6 @@ static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) {
} \
} while(0)
static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) {
if(duration_s < 0)
duration_s = -duration_s;
size_t pos = 0;
dst[0] = 0 ;
bool printed = false;
print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed);
print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed);
print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed);
print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed);
print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed);
print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed);
}
static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) {
struct journal_file_source *jfs = entry;
BUFFER *wb = data;
@ -522,12 +493,12 @@ static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entr
buffer_json_add_array_item_object(wb);
{
char size_for_humans[100];
human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans));
char size_for_humans[128];
size_snprintf(size_for_humans, sizeof(size_for_humans), jfs->size, "B", false);
char duration_for_humans[1024];
human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC),
duration_for_humans, sizeof(duration_for_humans));
char duration_for_humans[128];
duration_snprintf(duration_for_humans, sizeof(duration_for_humans),
(time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC), "s", true);
char info[1024];
snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s",
@ -756,6 +727,7 @@ void journal_files_registry_update(void) {
dictionary_del(journal_files_registry, jf_dfe.name);
}
dfe_done(jf);
dictionary_garbage_collect(journal_files_registry);
journal_files_scans++;
spinlock_unlock(&spinlock);

View file

@ -0,0 +1,378 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_SYSTEMD_JOURNAL_SAMPLING_H
#define NETDATA_SYSTEMD_JOURNAL_SAMPLING_H
// ----------------------------------------------------------------------------
// sampling support
static inline void sampling_query_init(LOGS_QUERY_STATUS *lqs, FACETS *facets) {
if(!lqs->rq.sampling)
return;
if(!lqs->rq.slice) {
// the user is doing a full data query
// disable sampling
lqs->rq.sampling = 0;
return;
}
if(lqs->rq.data_only) {
// the user is doing a data query
// disable sampling
lqs->rq.sampling = 0;
return;
}
if(!lqs->c.files_matched) {
// no files have been matched
// disable sampling
lqs->rq.sampling = 0;
return;
}
lqs->c.samples.slots = facets_histogram_slots(facets);
if(lqs->c.samples.slots < 2)
lqs->c.samples.slots = 2;
if(lqs->c.samples.slots > SYSTEMD_JOURNAL_SAMPLING_SLOTS)
lqs->c.samples.slots = SYSTEMD_JOURNAL_SAMPLING_SLOTS;
if(!lqs->rq.after_ut || !lqs->rq.before_ut || lqs->rq.after_ut >= lqs->rq.before_ut) {
// we don't have enough information for sampling
lqs->rq.sampling = 0;
return;
}
usec_t delta = lqs->rq.before_ut - lqs->rq.after_ut;
usec_t step = delta / facets_histogram_slots(facets) - 1;
if(step < 1) step = 1;
lqs->c.samples_per_time_slot.start_ut = lqs->rq.after_ut;
lqs->c.samples_per_time_slot.end_ut = lqs->rq.before_ut;
lqs->c.samples_per_time_slot.step_ut = step;
// the minimum number of rows to enable sampling
lqs->c.samples.enable_after_samples = lqs->rq.sampling / 2;
size_t files_matched = lqs->c.files_matched;
if(!files_matched)
files_matched = 1;
// the minimum number of rows per file to enable sampling
lqs->c.samples_per_file.enable_after_samples = (lqs->rq.sampling / 4) / files_matched;
if(lqs->c.samples_per_file.enable_after_samples < lqs->rq.entries)
lqs->c.samples_per_file.enable_after_samples = lqs->rq.entries;
// the minimum number of rows per time slot to enable sampling
lqs->c.samples_per_time_slot.enable_after_samples = (lqs->rq.sampling / 4) / lqs->c.samples.slots;
if(lqs->c.samples_per_time_slot.enable_after_samples < lqs->rq.entries)
lqs->c.samples_per_time_slot.enable_after_samples = lqs->rq.entries;
}
static inline void sampling_file_init(LOGS_QUERY_STATUS *lqs, struct journal_file *jf __maybe_unused) {
lqs->c.samples_per_file.sampled = 0;
lqs->c.samples_per_file.unsampled = 0;
lqs->c.samples_per_file.estimated = 0;
lqs->c.samples_per_file.every = 0;
lqs->c.samples_per_file.skipped = 0;
lqs->c.samples_per_file.recalibrate = 0;
}
static inline size_t sampling_file_lines_scanned_so_far(LOGS_QUERY_STATUS *lqs) {
size_t sampled = lqs->c.samples_per_file.sampled + lqs->c.samples_per_file.unsampled;
if(!sampled) sampled = 1;
return sampled;
}
static inline void sampling_running_file_query_overlapping_timeframe_ut(
LOGS_QUERY_STATUS *lqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction,
usec_t msg_ut, usec_t *after_ut, usec_t *before_ut) {
// find the overlap of the query and file timeframes
// taking into account the first message we encountered
usec_t oldest_ut, newest_ut;
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
// the first message we know (oldest)
oldest_ut = lqs->c.query_file.first_msg_ut ? lqs->c.query_file.first_msg_ut : jf->msg_first_ut;
if(!oldest_ut) oldest_ut = lqs->c.query_file.start_ut;
if(jf->msg_last_ut)
newest_ut = MIN(lqs->c.query_file.stop_ut, jf->msg_last_ut);
else if(jf->file_last_modified_ut)
newest_ut = MIN(lqs->c.query_file.stop_ut, jf->file_last_modified_ut);
else
newest_ut = lqs->c.query_file.stop_ut;
if(msg_ut < oldest_ut)
oldest_ut = msg_ut - 1;
}
else /* BACKWARD */ {
// the latest message we know (newest)
newest_ut = lqs->c.query_file.first_msg_ut ? lqs->c.query_file.first_msg_ut : jf->msg_last_ut;
if(!newest_ut) newest_ut = lqs->c.query_file.start_ut;
if(jf->msg_first_ut)
oldest_ut = MAX(lqs->c.query_file.stop_ut, jf->msg_first_ut);
else
oldest_ut = lqs->c.query_file.stop_ut;
if(newest_ut < msg_ut)
newest_ut = msg_ut + 1;
}
*after_ut = oldest_ut;
*before_ut = newest_ut;
}
static inline double sampling_running_file_query_progress_by_time(
LOGS_QUERY_STATUS *lqs, struct journal_file *jf,
FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
usec_t after_ut, before_ut, elapsed_ut;
sampling_running_file_query_overlapping_timeframe_ut(lqs, jf, direction, msg_ut, &after_ut, &before_ut);
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
elapsed_ut = msg_ut - after_ut;
else
elapsed_ut = before_ut - msg_ut;
usec_t total_ut = before_ut - after_ut;
double progress = (double)elapsed_ut / (double)total_ut;
return progress;
}
static inline usec_t sampling_running_file_query_remaining_time(
LOGS_QUERY_STATUS *lqs, struct journal_file *jf,
FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut,
usec_t *total_time_ut, usec_t *remaining_start_ut,
usec_t *remaining_end_ut) {
usec_t after_ut, before_ut;
sampling_running_file_query_overlapping_timeframe_ut(lqs, jf, direction, msg_ut, &after_ut, &before_ut);
// since we have a timestamp in msg_ut
// this timestamp can extend the overlap
if(msg_ut <= after_ut)
after_ut = msg_ut - 1;
if(msg_ut >= before_ut)
before_ut = msg_ut + 1;
// return the remaining duration
usec_t remaining_from_ut, remaining_to_ut;
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
remaining_from_ut = msg_ut;
remaining_to_ut = before_ut;
}
else {
remaining_from_ut = after_ut;
remaining_to_ut = msg_ut;
}
usec_t remaining_ut = remaining_to_ut - remaining_from_ut;
if(total_time_ut)
*total_time_ut = (before_ut > after_ut) ? before_ut - after_ut : 1;
if(remaining_start_ut)
*remaining_start_ut = remaining_from_ut;
if(remaining_end_ut)
*remaining_end_ut = remaining_to_ut;
return remaining_ut;
}
static inline size_t sampling_running_file_query_estimate_remaining_lines_by_time(
LOGS_QUERY_STATUS *lqs,
struct journal_file *jf,
FACETS_ANCHOR_DIRECTION direction,
usec_t msg_ut) {
size_t scanned_lines = sampling_file_lines_scanned_so_far(lqs);
// Calculate the proportion of time covered
usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
usec_t remaining_time_ut = sampling_running_file_query_remaining_time(
lqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut, &remaining_end_ut);
if (total_time_ut == 0) total_time_ut = 1;
double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut;
if (proportion_by_time == 0 || proportion_by_time > 1.0 || !isfinite(proportion_by_time))
proportion_by_time = 1.0;
// Estimate the total number of lines in the file
size_t expected_matching_logs_by_time = (size_t)((double)scanned_lines / proportion_by_time);
if(jf->messages_in_file && expected_matching_logs_by_time > jf->messages_in_file)
expected_matching_logs_by_time = jf->messages_in_file;
// Calculate the estimated number of remaining lines
size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines;
if (remaining_logs_by_time < 1) remaining_logs_by_time = 1;
// nd_log(NDLS_COLLECTORS, NDLP_INFO,
// "JOURNAL ESTIMATION: '%s' "
// "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
// "first message read from the file at %"PRIu64", current message at %"PRIu64", "
// "proportion of time %.2f %%, "
// "expected total lines in file %zu, "
// "remaining lines %zu, "
// "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]"
// , jf->filename
// , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
// , fqs->query_file.first_msg_ut, msg_ut
// , proportion_by_time * 100.0
// , expected_matching_logs_by_time
// , remaining_logs_by_time
// , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut
// );
return remaining_logs_by_time;
}
static inline size_t sampling_running_file_query_estimate_remaining_lines(
sd_journal *j __maybe_unused, LOGS_QUERY_STATUS *lqs, struct journal_file *jf,
FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
size_t remaining_logs_by_seqnum = 0;
#ifdef HAVE_SD_JOURNAL_GET_SEQNUM
size_t expected_matching_logs_by_seqnum = 0;
double proportion_by_seqnum = 0.0;
uint64_t current_msg_seqnum;
sd_id128_t current_msg_writer;
if(!lqs->c.query_file.first_msg_seqnum || sd_journal_get_seqnum(j, &current_msg_seqnum, &current_msg_writer) < 0) {
lqs->c.query_file.first_msg_seqnum = 0;
lqs->c.query_file.first_msg_writer = SD_ID128_NULL;
}
else if(jf->messages_in_file) {
size_t scanned_lines = sampling_file_lines_scanned_so_far(lqs);
double proportion_of_all_lines_so_far;
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
proportion_of_all_lines_so_far = (double)scanned_lines / (double)(current_msg_seqnum - jf->first_seqnum);
else
proportion_of_all_lines_so_far = (double)scanned_lines / (double)(jf->last_seqnum - current_msg_seqnum);
if(proportion_of_all_lines_so_far > 1.0)
proportion_of_all_lines_so_far = 1.0;
expected_matching_logs_by_seqnum = (size_t)(proportion_of_all_lines_so_far * (double)jf->messages_in_file);
proportion_by_seqnum = (double)scanned_lines / (double)expected_matching_logs_by_seqnum;
if (proportion_by_seqnum == 0 || proportion_by_seqnum > 1.0 || !isfinite(proportion_by_seqnum))
proportion_by_seqnum = 1.0;
remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines;
if(!remaining_logs_by_seqnum) remaining_logs_by_seqnum = 1;
}
#endif
if(remaining_logs_by_seqnum)
return remaining_logs_by_seqnum;
return sampling_running_file_query_estimate_remaining_lines_by_time(lqs, jf, direction, msg_ut);
}
static inline void sampling_decide_file_sampling_every(sd_journal *j,
LOGS_QUERY_STATUS *lqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
size_t files_matched = lqs->c.files_matched;
if(!files_matched) files_matched = 1;
size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, lqs, jf, direction, msg_ut);
size_t wanted_samples = (lqs->rq.sampling / 2) / files_matched;
if(!wanted_samples) wanted_samples = 1;
lqs->c.samples_per_file.every = remaining_lines / wanted_samples;
if(lqs->c.samples_per_file.every < 1)
lqs->c.samples_per_file.every = 1;
}
typedef enum {
SAMPLING_STOP_AND_ESTIMATE = -1,
SAMPLING_FULL = 0,
SAMPLING_SKIP_FIELDS = 1,
} sampling_t;
static inline sampling_t is_row_in_sample(
sd_journal *j, LOGS_QUERY_STATUS *lqs, struct journal_file *jf,
usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction, bool candidate_to_keep) {
if(!lqs->rq.sampling || candidate_to_keep)
return SAMPLING_FULL;
if(unlikely(msg_ut < lqs->c.samples_per_time_slot.start_ut))
msg_ut = lqs->c.samples_per_time_slot.start_ut;
if(unlikely(msg_ut > lqs->c.samples_per_time_slot.end_ut))
msg_ut = lqs->c.samples_per_time_slot.end_ut;
size_t slot = (msg_ut - lqs->c.samples_per_time_slot.start_ut) / lqs->c.samples_per_time_slot.step_ut;
if(slot >= lqs->c.samples.slots)
slot = lqs->c.samples.slots - 1;
bool should_sample = false;
if(lqs->c.samples.sampled < lqs->c.samples.enable_after_samples ||
lqs->c.samples_per_file.sampled < lqs->c.samples_per_file.enable_after_samples ||
lqs->c.samples_per_time_slot.sampled[slot] < lqs->c.samples_per_time_slot.enable_after_samples)
should_sample = true;
else if(lqs->c.samples_per_file.recalibrate >= SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE || !lqs->c.samples_per_file.every) {
// this is the first to be unsampled for this file
sampling_decide_file_sampling_every(j, lqs, jf, direction, msg_ut);
lqs->c.samples_per_file.recalibrate = 0;
should_sample = true;
}
else {
// we sample 1 every fqs->samples_per_file.every
if(lqs->c.samples_per_file.skipped >= lqs->c.samples_per_file.every) {
lqs->c.samples_per_file.skipped = 0;
should_sample = true;
}
else
lqs->c.samples_per_file.skipped++;
}
if(should_sample) {
lqs->c.samples.sampled++;
lqs->c.samples_per_file.sampled++;
lqs->c.samples_per_time_slot.sampled[slot]++;
return SAMPLING_FULL;
}
lqs->c.samples_per_file.recalibrate++;
lqs->c.samples.unsampled++;
lqs->c.samples_per_file.unsampled++;
lqs->c.samples_per_time_slot.unsampled[slot]++;
if(lqs->c.samples_per_file.unsampled > lqs->c.samples_per_file.sampled) {
double progress_by_time = sampling_running_file_query_progress_by_time(lqs, jf, direction, msg_ut);
if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE)
return SAMPLING_STOP_AND_ESTIMATE;
}
return SAMPLING_SKIP_FIELDS;
}
static inline void sampling_update_running_query_file_estimates(
FACETS *facets, sd_journal *j,
LOGS_QUERY_STATUS *lqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction) {
usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
sampling_running_file_query_remaining_time(
lqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut, &remaining_end_ut);
size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, lqs, jf, direction, msg_ut);
facets_update_estimations(facets, remaining_start_ut, remaining_end_ut, remaining_lines);
lqs->c.samples.estimated += remaining_lines;
lqs->c.samples_per_file.estimated += remaining_lines;
}
#endif //NETDATA_SYSTEMD_JOURNAL_SAMPLING_H

File diff suppressed because it is too large Load diff

View file

@ -114,10 +114,10 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
// ------------------------------------------------------------------------
usec_t step_ut = 100 * USEC_PER_MS;
const usec_t step_ut = 100 * USEC_PER_MS;
usec_t send_newline_ut = 0;
usec_t since_last_scan_ut = SYSTEMD_JOURNAL_ALL_FILES_SCAN_EVERY_USEC * 2; // something big to trigger scanning at start
bool tty = isatty(fileno(stdout)) == 1;
const bool tty = isatty(fileno(stdout)) == 1;
heartbeat_t hb;
heartbeat_init(&hb);

View file

@ -1153,7 +1153,10 @@ static void netdata_systemd_units_function_help(const char *transaction) {
);
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_TEXT_PLAIN;
wb->expires = now_realtime_sec() + 3600;
pluginsd_function_result_to_stdout(transaction, wb);
netdata_mutex_unlock(&stdout_mutex);
buffer_free(wb);
@ -1169,7 +1172,10 @@ static void netdata_systemd_units_function_info(const char *transaction) {
buffer_json_finalize(wb);
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_TEXT_PLAIN;
wb->expires = now_realtime_sec() + 3600;
pluginsd_function_result_to_stdout(transaction, wb);
netdata_mutex_unlock(&stdout_mutex);
buffer_free(wb);
@ -1958,7 +1964,10 @@ void function_systemd_units(const char *transaction, char *function,
buffer_json_finalize(wb);
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb);
wb->response_code = HTTP_RESP_OK;
wb->content_type = CT_APPLICATION_JSON;
wb->expires = now_realtime_sec() + 1;
pluginsd_function_result_to_stdout(transaction, wb);
netdata_mutex_unlock(&stdout_mutex);
buffer_free(wb);

View file

@ -0,0 +1,674 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "windows-events-query.h"
static uint64_t wevt_log_file_size(const wchar_t *channel);
#define FIELD_CHANNEL (0)
#define FIELD_PROVIDER_NAME (1)
#define FIELD_EVENT_SOURCE_NAME (2)
#define FIELD_PROVIDER_GUID (3)
#define FIELD_RECORD_NUMBER (4)
#define FIELD_EVENT_ID (5)
#define FIELD_LEVEL (6)
#define FIELD_KEYWORDS (7)
#define FIELD_TIME_CREATED (8)
#define FIELD_COMPUTER_NAME (9)
#define FIELD_USER_ID (10)
#define FIELD_CORRELATION_ACTIVITY_ID (11)
#define FIELD_OPCODE (12)
#define FIELD_VERSION (13)
#define FIELD_TASK (14)
#define FIELD_PROCESS_ID (15)
#define FIELD_THREAD_ID (16)
#define FIELD_EVENT_DATA (17)
// These are the fields we extract from the logs
static const wchar_t *RENDER_ITEMS[] = {
L"/Event/System/Channel",
L"/Event/System/Provider/@Name",
L"/Event/System/Provider/@EventSourceName",
L"/Event/System/Provider/@Guid",
L"/Event/System/EventRecordID",
L"/Event/System/EventID",
L"/Event/System/Level",
L"/Event/System/Keywords",
L"/Event/System/TimeCreated/@SystemTime",
L"/Event/System/Computer",
L"/Event/System/Security/@UserID",
L"/Event/System/Correlation/@ActivityID",
L"/Event/System/Opcode",
L"/Event/System/Version",
L"/Event/System/Task",
L"/Event/System/Execution/@ProcessID",
L"/Event/System/Execution/@ThreadID",
L"/Event/EventData/Data"
};
static bool wevt_GUID_to_ND_UUID(ND_UUID *nd_uuid, const GUID *guid) {
if(guid && sizeof(GUID) == sizeof(ND_UUID)) {
memcpy(nd_uuid->uuid, guid, sizeof(ND_UUID));
return true;
}
else {
*nd_uuid = UUID_ZERO;
return false;
}
}
static uint64_t wevt_get_unsigned_by_type(WEVT_LOG *log, size_t field) {
switch(log->ops.content.data[field].Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeHexInt64: return log->ops.content.data[field].UInt64Val;
case EvtVarTypeHexInt32: return log->ops.content.data[field].UInt32Val;
case EvtVarTypeUInt64: return log->ops.content.data[field].UInt64Val;
case EvtVarTypeUInt32: return log->ops.content.data[field].UInt32Val;
case EvtVarTypeUInt16: return log->ops.content.data[field].UInt16Val;
case EvtVarTypeInt64: return ABS(log->ops.content.data[field].Int64Val);
case EvtVarTypeInt32: return ABS(log->ops.content.data[field].Int32Val);
case EvtVarTypeByte: return log->ops.content.data[field].ByteVal;
case EvtVarTypeInt16: return ABS(log->ops.content.data[field].Int16Val);
case EvtVarTypeSByte: return ABS(log->ops.content.data[field].SByteVal);
case EvtVarTypeSingle: return ABS(log->ops.content.data[field].SingleVal);
case EvtVarTypeDouble: return ABS(log->ops.content.data[field].DoubleVal);
case EvtVarTypeBoolean: return log->ops.content.data[field].BooleanVal ? 1 : 0;
case EvtVarTypeSizeT: return log->ops.content.data[field].SizeTVal;
default: return 0;
}
}
static uint64_t wevt_get_filetime_to_ns_by_type(WEVT_LOG *log, size_t field) {
switch(log->ops.content.data[field].Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeFileTime:
case EvtVarTypeUInt64:
return os_windows_ulonglong_to_unix_epoch_ns(log->ops.content.data[field].FileTimeVal);
default: return 0;
}
}
static bool wevt_get_uuid_by_type(WEVT_LOG *log, size_t field, ND_UUID *dst) {
switch(log->ops.content.data[field].Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeGuid:
return wevt_GUID_to_ND_UUID(dst, log->ops.content.data[field].GuidVal);
default:
return wevt_GUID_to_ND_UUID(dst, NULL);
}
}
static void wevt_empty_utf8(TXT_UTF8 *dst) {
txt_utf8_resize(dst, 1);
dst->data[0] = '\0';
dst->used = 1;
}
static bool wevt_get_message_utf8(WEVT_LOG *log, EVT_HANDLE event_handle, TXT_UTF8 *dst, EVT_FORMAT_MESSAGE_FLAGS what) {
DWORD size = 0;
if(!log->ops.unicode.data) {
EvtFormatMessage(NULL, event_handle, 0, 0, NULL, what, 0, NULL, &size);
if(!size) {
// nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtFormatMessage() to get message size failed.");
goto cleanup;
}
txt_unicode_resize(&log->ops.unicode, size);
}
// First, try to get the message using the existing buffer
if (!EvtFormatMessage(NULL, event_handle, 0, 0, NULL, what, log->ops.unicode.size, log->ops.unicode.data, &size) || !log->ops.unicode.data) {
if (log->ops.unicode.data && GetLastError() != ERROR_INSUFFICIENT_BUFFER) {
// nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtFormatMessage() failed.");
goto cleanup;
}
// Try again with the resized buffer
txt_unicode_resize(&log->ops.unicode, size);
if (!EvtFormatMessage(NULL, event_handle, 0, 0, NULL, what, log->ops.unicode.size, log->ops.unicode.data, &size)) {
// nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtFormatMessage() failed after resizing buffer.");
goto cleanup;
}
}
// EvtFormatMessage pads it with zeros at the end
while(size >= 2 && log->ops.unicode.data[size - 2] == 0)
size--;
log->ops.unicode.used = size;
internal_fatal(wcslen(log->ops.unicode.data) + 1 != (size_t)log->ops.unicode.used,
"Wrong unicode string length!");
return wevt_str_unicode_to_utf8(dst, &log->ops.unicode);
cleanup:
wevt_empty_utf8(dst);
return false;
}
static bool wevt_get_utf8_by_type(WEVT_LOG *log, size_t field, TXT_UTF8 *dst) {
switch(log->ops.content.data[field].Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeString:
return wevt_str_wchar_to_utf8(dst, log->ops.content.data[field].StringVal, -1);
default:
wevt_empty_utf8(dst);
return false;
}
}
static bool wevt_get_sid_by_type(WEVT_LOG *log, size_t field, TXT_UTF8 *dst) {
switch(log->ops.content.data[field].Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeSid:
return wevt_convert_user_id_to_name(log->ops.content.data[field].SidVal, dst);
default:
wevt_empty_utf8(dst);
return false;
}
}
void wevt_field_to_buffer_append(BUFFER *wb, EVT_VARIANT *d, const char *prefix, const char *suffix) {
buffer_strcat(wb, prefix);
switch(d->Type & EVT_VARIANT_TYPE_MASK) {
case EvtVarTypeNull:
buffer_strcat(wb, "(null)");
break;
case EvtVarTypeString:
buffer_sprintf(wb, "%ls", d->StringVal);
break;
case EvtVarTypeSByte:
buffer_sprintf(wb, "%d", (int) d->SByteVal);
break;
case EvtVarTypeByte:
buffer_sprintf(wb, "%u", (unsigned) d->ByteVal);
break;
case EvtVarTypeInt16:
buffer_sprintf(wb, "%d", (int) d->Int16Val);
break;
case EvtVarTypeUInt16:
buffer_sprintf(wb, "%u", (unsigned) d->UInt16Val);
break;
case EvtVarTypeInt32:
buffer_sprintf(wb, "%d", (int) d->Int32Val);
break;
case EvtVarTypeUInt32:
case EvtVarTypeHexInt32:
buffer_sprintf(wb, "%u", (unsigned) d->UInt32Val);
break;
case EvtVarTypeInt64:
buffer_sprintf(wb, "%" PRIi64, (uint64_t)d->Int64Val);
break;
case EvtVarTypeUInt64:
case EvtVarTypeHexInt64:
buffer_sprintf(wb, "%" PRIu64, (uint64_t)d->UInt64Val);
break;
case EvtVarTypeSizeT:
buffer_sprintf(wb, "%zu", d->SizeTVal);
break;
case EvtVarTypeSysTime:
buffer_sprintf(wb, "%04d-%02d-%02dT%02d:%02d:%02d.%03d (SystemTime)",
d->SysTimeVal->wYear, d->SysTimeVal->wMonth, d->SysTimeVal->wDay,
d->SysTimeVal->wHour, d->SysTimeVal->wMinute, d->SysTimeVal->wSecond,
d->SysTimeVal->wMilliseconds);
break;
case EvtVarTypeGuid: {
char uuid_str[UUID_COMPACT_STR_LEN];
ND_UUID uuid;
if (wevt_GUID_to_ND_UUID(&uuid, d->GuidVal)) {
uuid_unparse_lower_compact(uuid.uuid, uuid_str);
buffer_strcat(wb, uuid_str);
buffer_strcat(wb, " (GUID)");
}
break;
}
case EvtVarTypeSingle:
buffer_print_netdata_double(wb, d->SingleVal);
break;
case EvtVarTypeDouble:
buffer_print_netdata_double(wb, d->DoubleVal);
break;
case EvtVarTypeBoolean:
buffer_strcat(wb, d->BooleanVal ? "true" : "false");
break;
case EvtVarTypeFileTime: {
nsec_t ns = os_windows_ulonglong_to_unix_epoch_ns(d->FileTimeVal);
char buf[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(buf, sizeof(buf), ns, 2, true);
buffer_strcat(wb, buf);
buffer_strcat(wb, " (FileTime)");
break;
}
case EvtVarTypeBinary:
buffer_strcat(wb, "(binary data)");
break;
case EvtVarTypeSid:
buffer_strcat(wb, "(user id data)");
break;
case EvtVarTypeAnsiString:
case EvtVarTypeEvtHandle:
case EvtVarTypeEvtXml:
default:
buffer_sprintf(wb, "(unsupported data type: %u)", d->Type);
break;
}
buffer_strcat(wb, suffix);
}
static bool wevt_format_summary_array_traversal(WEVT_LOG *log, size_t field, WEVT_EVENT *ev) {
EVT_VARIANT *d = &log->ops.content.data[field];
if (!log->ops.message)
log->ops.message = buffer_create(0, NULL);
BUFFER *wb = log->ops.message;
buffer_flush(wb);
// Check if there is an event description
if (log->ops.event.data && *log->ops.event.data) {
buffer_strcat(wb, log->ops.event.data);
buffer_strcat(wb, "\nRelated data:\n");
} else {
buffer_sprintf(wb, "Event %" PRIu64 ", with the following related data:\n", (uint64_t)ev->event_id);
}
// Check if the field contains an array or a single value
bool is_array = (d->Type & EVT_VARIANT_TYPE_ARRAY) != 0;
EVT_VARIANT_TYPE base_type = (EVT_VARIANT_TYPE)(d->Type & EVT_VARIANT_TYPE_MASK);
if (is_array) {
DWORD count = d->Count; // Number of elements in the array
for (DWORD i = 0; i < count; i++) {
EVT_VARIANT array_element = {
.Type = base_type,
.Count = 0,
};
// Point the array element to the correct data
switch (base_type) {
case EvtVarTypeBoolean:
array_element.BooleanVal = d->BooleanArr[i];
break;
case EvtVarTypeSByte:
array_element.SByteVal = d->SByteArr[i];
break;
case EvtVarTypeByte:
array_element.ByteVal = d->ByteArr[i];
break;
case EvtVarTypeInt16:
array_element.Int16Val = d->Int16Arr[i];
break;
case EvtVarTypeUInt16:
array_element.UInt16Val = d->UInt16Arr[i];
break;
case EvtVarTypeInt32:
array_element.Int32Val = d->Int32Arr[i];
break;
case EvtVarTypeUInt32:
array_element.UInt32Val = d->UInt32Arr[i];
break;
case EvtVarTypeInt64:
array_element.Int64Val = d->Int64Arr[i];
break;
case EvtVarTypeUInt64:
array_element.UInt64Val = d->UInt64Arr[i];
break;
case EvtVarTypeSingle:
array_element.SingleVal = d->SingleArr[i];
break;
case EvtVarTypeDouble:
array_element.DoubleVal = d->DoubleArr[i];
break;
case EvtVarTypeFileTime:
array_element.FileTimeVal = ((ULONGLONG)d->FileTimeArr[i].dwLowDateTime |
((ULONGLONG)d->FileTimeArr[i].dwHighDateTime << 32));
break;
case EvtVarTypeSysTime:
array_element.SysTimeVal = &d->SysTimeArr[i];
break;
case EvtVarTypeGuid:
array_element.GuidVal = &d->GuidArr[i];
break;
case EvtVarTypeString:
array_element.StringVal = d->StringArr[i];
break;
case EvtVarTypeAnsiString:
array_element.AnsiStringVal = d->AnsiStringArr[i];
break;
case EvtVarTypeSid:
array_element.SidVal = d->SidArr[i];
break;
case EvtVarTypeSizeT:
array_element.SizeTVal = d->SizeTArr[i];
break;
case EvtVarTypeEvtXml:
array_element.XmlVal = d->XmlValArr[i];
break;
default:
buffer_sprintf(wb, " - Unsupported array type: %u\n", base_type);
continue;
}
// Call the field appending function for each array element
wevt_field_to_buffer_append(wb, &array_element, " - ", "\n");
}
} else {
// Handle single values, pass the EVT_VARIANT directly
wevt_field_to_buffer_append(wb, d, " - ", "\n");
}
return true;
}
static bool wevt_format_summary(WEVT_LOG *log, WEVT_EVENT *ev) {
if (!log->ops.message)
log->ops.message = buffer_create(0, NULL);
BUFFER *wb = log->ops.message;
buffer_flush(wb);
// Check if there is an event description
if (log->ops.event.data && *log->ops.event.data)
buffer_strcat(wb, log->ops.event.data);
else
buffer_sprintf(wb, "Event %" PRIu64, (uint64_t) ev->event_id);
const char *xml = log->ops.xml.data;
const char *event_data_start = strstr(xml, "<EventData>");
if(event_data_start)
event_data_start = &event_data_start[11];
const char *event_data_end = event_data_start ? strstr(xml, "</EventData>") : NULL;
if(event_data_start && event_data_end) {
buffer_strcat(wb, "\n\nRelated data:\n\n");
// copy the event data block
buffer_fast_strcat(wb, event_data_start, event_data_end - event_data_start);
}
return true;
}
bool wevt_get_next_event(WEVT_LOG *log, WEVT_EVENT *ev, bool full) {
DWORD size_required_next = 0, size = 0, bookmarkedCount = 0;
EVT_HANDLE tmp_event_bookmark = NULL;
bool ret = false;
fatal_assert(log && log->event_query && log->render_context);
if (!EvtNext(log->event_query, 1, &tmp_event_bookmark, INFINITE, 0, &size_required_next))
goto cleanup; // no data available, return failure
// obtain the information from selected events
if (!EvtRender(log->render_context, tmp_event_bookmark, EvtRenderEventValues, log->ops.content.size, log->ops.content.data, &size, &bookmarkedCount)) {
// information exceeds the allocated space
if (GetLastError() != ERROR_INSUFFICIENT_BUFFER) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtRender() failed");
goto cleanup;
}
freez(log->ops.content.data);
log->ops.content.size = size;
log->ops.content.data = (EVT_VARIANT *)mallocz(log->ops.content.size);
if (!EvtRender(log->render_context, tmp_event_bookmark, EvtRenderEventValues, log->ops.content.size, log->ops.content.data, &size, &bookmarkedCount)) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtRender() failed, after size increase");
goto cleanup;
}
}
log->ops.content.len = size;
ev->id = wevt_get_unsigned_by_type(log, FIELD_RECORD_NUMBER);
ev->event_id = wevt_get_unsigned_by_type(log, FIELD_EVENT_ID);
ev->level = wevt_get_unsigned_by_type(log, FIELD_LEVEL);
ev->keyword = wevt_get_unsigned_by_type(log, FIELD_KEYWORDS);
ev->created_ns = wevt_get_filetime_to_ns_by_type(log, FIELD_TIME_CREATED);
ev->opcode = wevt_get_unsigned_by_type(log, FIELD_OPCODE);
ev->version = wevt_get_unsigned_by_type(log, FIELD_VERSION);
ev->task = wevt_get_unsigned_by_type(log, FIELD_TASK);
ev->process_id = wevt_get_unsigned_by_type(log, FIELD_PROCESS_ID);
ev->thread_id = wevt_get_unsigned_by_type(log, FIELD_THREAD_ID);
if(full) {
wevt_get_utf8_by_type(log, FIELD_CHANNEL, &log->ops.channel);
wevt_get_utf8_by_type(log, FIELD_PROVIDER_NAME, &log->ops.provider);
wevt_get_utf8_by_type(log, FIELD_EVENT_SOURCE_NAME, &log->ops.source);
wevt_get_uuid_by_type(log, FIELD_PROVIDER_GUID, &ev->provider);
wevt_get_message_utf8(log, tmp_event_bookmark, &log->ops.level, EvtFormatMessageLevel);
if(ev->event_id)
wevt_get_message_utf8(log, tmp_event_bookmark, &log->ops.event, EvtFormatMessageEvent);
else
wevt_empty_utf8(&log->ops.event);
if(ev->keyword)
wevt_get_message_utf8(log, tmp_event_bookmark, &log->ops.keyword, EvtFormatMessageKeyword);
else
wevt_empty_utf8(&log->ops.keyword);
if(ev->opcode)
wevt_get_message_utf8(log, tmp_event_bookmark, &log->ops.opcode, EvtFormatMessageOpcode);
else
wevt_empty_utf8(&log->ops.keyword);
// ComputerName
wevt_get_utf8_by_type(log, FIELD_COMPUTER_NAME, &log->ops.computer);
// User
wevt_get_sid_by_type(log, FIELD_USER_ID, &log->ops.user);
// CorrelationActivityID
wevt_get_uuid_by_type(log, FIELD_CORRELATION_ACTIVITY_ID, &ev->correlation_activity_id);
// Full XML of the entire message
wevt_get_message_utf8(log, tmp_event_bookmark, &log->ops.xml, EvtFormatMessageXml);
// Format a text message for the users to see
// wevt_format_summary(log, ev);
}
ret = true;
cleanup:
if (tmp_event_bookmark)
EvtClose(tmp_event_bookmark);
return ret;
}
void wevt_query_done(WEVT_LOG *log) {
if (!log->event_query) return;
EvtClose(log->event_query);
log->event_query = NULL;
}
void wevt_closelog6(WEVT_LOG *log) {
wevt_query_done(log);
if (log->render_context)
EvtClose(log->render_context);
freez(log->ops.content.data);
txt_unicode_cleanup(&log->ops.unicode);
txt_utf8_cleanup(&log->ops.channel);
txt_utf8_cleanup(&log->ops.provider);
txt_utf8_cleanup(&log->ops.source);
txt_utf8_cleanup(&log->ops.computer);
txt_utf8_cleanup(&log->ops.event);
txt_utf8_cleanup(&log->ops.user);
txt_utf8_cleanup(&log->ops.opcode);
txt_utf8_cleanup(&log->ops.level);
txt_utf8_cleanup(&log->ops.keyword);
txt_utf8_cleanup(&log->ops.xml);
buffer_free(log->ops.message);
freez(log);
}
bool wevt_channel_retention(WEVT_LOG *log, const wchar_t *channel, EVT_RETENTION *retention) {
bool ret = false;
// get the number of the oldest record in the log
// "EvtGetLogInfo()" does not work properly with "EvtLogOldestRecordNumber"
// we have to get it from the first EventRecordID
// query the eventlog
log->event_query = EvtQuery(NULL, channel, NULL, EvtQueryChannelPath);
if (!log->event_query) {
if (GetLastError() == ERROR_EVT_CHANNEL_NOT_FOUND)
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtQuery() failed, channel '%s' not found, cannot open log", channel2utf8(channel));
else
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtQuery() on channel '%s' failed, cannot open log", channel2utf8(channel));
goto cleanup;
}
if (!wevt_get_next_event(log, &retention->first_event, false))
goto cleanup;
if (!retention->first_event.id) {
// no data in the event log
retention->first_event = retention->last_event = WEVT_EVENT_EMPTY;
ret = true;
goto cleanup;
}
EvtClose(log->event_query);
log->event_query = EvtQuery(NULL, channel, NULL, EvtQueryChannelPath | EvtQueryReverseDirection);
if (!log->event_query) {
if (GetLastError() == ERROR_EVT_CHANNEL_NOT_FOUND)
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtQuery() failed, channel '%s' not found", channel2utf8(channel));
else
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtQuery() on channel '%s' failed", channel2utf8(channel));
goto cleanup;
}
if (!wevt_get_next_event(log, &retention->last_event, false) || retention->last_event.id == 0) {
// no data in eventlog
retention->last_event = retention->first_event;
}
retention->last_event.id += 1; // we should read the last record
ret = true;
cleanup:
wevt_query_done(log);
if(ret) {
retention->entries = retention->last_event.id - retention->first_event.id;
if(retention->last_event.created_ns >= retention->first_event.created_ns)
retention->duration_ns = retention->last_event.created_ns - retention->first_event.created_ns;
else
retention->duration_ns = retention->first_event.created_ns - retention->last_event.created_ns;
retention->size_bytes = wevt_log_file_size(channel);
}
else
memset(retention, 0, sizeof(*retention));
return ret;
}
WEVT_LOG *wevt_openlog6(void) {
size_t RENDER_ITEMS_count = (sizeof(RENDER_ITEMS) / sizeof(const wchar_t *));
WEVT_LOG *log = callocz(1, sizeof(*log));
// create the system render
log->render_context = EvtCreateRenderContext(RENDER_ITEMS_count, RENDER_ITEMS, EvtRenderContextValues);
if (!log->render_context) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtCreateRenderContext failed.");
goto cleanup;
}
cleanup:
return log;
}
static uint64_t wevt_log_file_size(const wchar_t *channel) {
EVT_HANDLE hLog = NULL;
EVT_VARIANT evtVariant;
DWORD bufferUsed = 0;
uint64_t file_size = 0;
// Open the event log channel
hLog = EvtOpenLog(NULL, channel, EvtOpenChannelPath);
if (!hLog) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtOpenLog() on channel '%s' failed", channel2utf8(channel));
goto cleanup;
}
// Get the file size of the log
if (!EvtGetLogInfo(hLog, EvtLogFileSize, sizeof(evtVariant), &evtVariant, &bufferUsed)) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtGetLogInfo() on channel '%s' failed", channel2utf8(channel));
goto cleanup;
}
// Extract the file size from the EVT_VARIANT structure
file_size = evtVariant.UInt64Val;
cleanup:
if (hLog)
EvtClose(hLog);
return file_size;
}
EVT_HANDLE wevt_query(LPCWSTR channel, usec_t seek_to, bool backward) {
// Convert microseconds to nanoseconds first (correct handling of precision)
if(backward) seek_to += USEC_PER_MS; // for backward mode, add a millisecond to the seek time.
// Convert the microseconds since Unix epoch to FILETIME (used in Windows APIs)
FILETIME fileTime = os_unix_epoch_ut_to_filetime(seek_to);
// Convert FILETIME to SYSTEMTIME for use in XPath
SYSTEMTIME systemTime;
if (!FileTimeToSystemTime(&fileTime, &systemTime)) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "FileTimeToSystemTime() failed");
return NULL;
}
// Format SYSTEMTIME into ISO 8601 format (YYYY-MM-DDTHH:MM:SS.sssZ)
static __thread WCHAR query[4096];
swprintf(query, 4096,
L"Event/System[TimeCreated[@SystemTime%ls\"%04d-%02d-%02dT%02d:%02d:%02d.%03dZ\"]]",
backward ? L"<=" : L">=", // Use <= if backward, >= if forward
systemTime.wYear, systemTime.wMonth, systemTime.wDay,
systemTime.wHour, systemTime.wMinute, systemTime.wSecond, systemTime.wMilliseconds);
// Execute the query
EVT_HANDLE hQuery = EvtQuery(NULL, channel, query, EvtQueryChannelPath | (backward ? EvtQueryReverseDirection : EvtQueryForwardDirection));
if (!hQuery) {
wchar_t wbuf[1024];
DWORD wbuf_used;
EvtGetExtendedStatus(sizeof(wbuf), wbuf, &wbuf_used);
char buf[1024];
rfc3339_datetime_ut(buf, sizeof(buf), seek_to, 3, true);
nd_log(NDLS_COLLECTORS, NDLP_ERR, "EvtQuery() failed, seek to '%s', query: %s | extended info: %ls", buf, query2utf8(query), wbuf);
}
return hQuery;
}

View file

@ -0,0 +1,81 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WINDOWS_EVENTS_QUERY_H
#define NETDATA_WINDOWS_EVENTS_QUERY_H
#include "windows-events.h"
typedef struct wevt_event {
uint64_t id; // EventRecordId (unique and sequential per channel)
uint16_t event_id; // This is the template that defines the message to be shown
uint16_t opcode;
uint8_t level; // The severity of event
uint8_t version;
uint16_t task;
uint32_t process_id;
uint32_t thread_id;
uint64_t keyword; // Categorization of the event
ND_UUID provider;
ND_UUID correlation_activity_id;
nsec_t created_ns;
} WEVT_EVENT;
#define WEVT_EVENT_EMPTY (WEVT_EVENT){ .id = 0, .created_ns = 0, }
typedef struct {
WEVT_EVENT first_event;
WEVT_EVENT last_event;
uint64_t entries;
nsec_t duration_ns;
uint64_t size_bytes;
} EVT_RETENTION;
typedef struct wevt_log {
EVT_HANDLE event_query;
EVT_HANDLE render_context;
struct {
// temp buffer used for rendering event log messages
// never use directly
struct {
EVT_VARIANT *data;
size_t size;
size_t len;
} content;
// temp buffer used for fetching and converting UNICODE and UTF-8
// every string operation overwrites it, multiple times per event log entry
// it can be used within any function, for its own purposes,
// but never share between functions
TXT_UNICODE unicode;
// string attributes of the current event log entry
// valid until another event if fetched
TXT_UTF8 channel;
TXT_UTF8 provider;
TXT_UTF8 source;
TXT_UTF8 computer;
TXT_UTF8 event;
TXT_UTF8 user;
TXT_UTF8 opcode;
TXT_UTF8 level;
TXT_UTF8 keyword;
TXT_UTF8 xml;
BUFFER *message;
} ops;
} WEVT_LOG;
WEVT_LOG *wevt_openlog6(void);
void wevt_closelog6(WEVT_LOG *log);
bool wevt_channel_retention(WEVT_LOG *log, const wchar_t *channel, EVT_RETENTION *retention);
EVT_HANDLE wevt_query(LPCWSTR channel, usec_t seek_to, bool backward);
void wevt_query_done(WEVT_LOG *log);
bool wevt_get_next_event(WEVT_LOG *log, WEVT_EVENT *ev, bool full);
#endif //NETDATA_WINDOWS_EVENTS_QUERY_H

View file

@ -0,0 +1,120 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "windows-events-sid.h"
#include <sddl.h>
typedef struct {
size_t len;
uint8_t sid[];
} SID_KEY;
typedef struct {
const char *user;
size_t user_len;
// this needs to be last, because of its variable size
SID_KEY key;
} SID_VALUE;
#define SIMPLE_HASHTABLE_NAME _SID
#define SIMPLE_HASHTABLE_VALUE_TYPE SID_VALUE
#define SIMPLE_HASHTABLE_KEY_TYPE SID_KEY
#define SIMPLE_HASHTABLE_VALUE2KEY_FUNCTION sid_value_to_key
#define SIMPLE_HASHTABLE_COMPARE_KEYS_FUNCTION sid_cache_compar
#define SIMPLE_HASHTABLE_SAMPLE_IMPLEMENTATION 1
#include "libnetdata/simple_hashtable.h"
static struct {
SPINLOCK spinlock;
bool initialized;
struct simple_hashtable_SID hashtable;
} sid_globals = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.hashtable = { 0 },
};
static inline SID_KEY *sid_value_to_key(SID_VALUE *s) {
return &s->key;
}
static inline bool sid_cache_compar(SID_KEY *a, SID_KEY *b) {
return a->len == b->len && memcmp(&a->sid, &b->sid, a->len) == 0;
}
static bool update_user(SID_VALUE *found, TXT_UTF8 *dst) {
if(found && found->user) {
txt_utf8_resize(dst, found->user_len + 1);
memcpy(dst->data, found->user, found->user_len + 1);
dst->used = found->user_len + 1;
return true;
}
txt_utf8_resize(dst, 1);
dst->data[0] = '\0';
dst->used = 1;
return false;
}
static void lookup_user(PSID *sid, TXT_UTF8 *dst) {
static __thread wchar_t account_unicode[256];
static __thread wchar_t domain_unicode[256];
DWORD account_name_size = sizeof(account_unicode) / sizeof(account_unicode[0]);
DWORD domain_name_size = sizeof(domain_unicode) / sizeof(domain_unicode[0]);
SID_NAME_USE sid_type;
txt_utf8_resize(dst, 1024);
if (LookupAccountSidW(NULL, sid, account_unicode, &account_name_size, domain_unicode, &domain_name_size, &sid_type)) {
const char *user = account2utf8(account_unicode);
const char *domain = domain2utf8(domain_unicode);
dst->used = snprintfz(dst->data, dst->size, "%s\\%s", domain, user) + 1;
}
else {
wchar_t *sid_string = NULL;
if (ConvertSidToStringSidW(sid, &sid_string)) {
const char *user = account2utf8(sid_string);
dst->used = snprintfz(dst->data, dst->size, "%s", user) + 1;
}
else
dst->used = snprintfz(dst->data, dst->size, "[invalid]") + 1;
}
}
bool wevt_convert_user_id_to_name(PSID sid, TXT_UTF8 *dst) {
if(!sid || !IsValidSid(sid))
return update_user(NULL, dst);
size_t size = GetLengthSid(sid);
size_t tmp_size = sizeof(SID_VALUE) + size;
size_t tmp_key_size = sizeof(SID_KEY) + size;
uint8_t buf[tmp_size];
SID_VALUE *tmp = (SID_VALUE *)&buf;
memcpy(&tmp->key.sid, sid, size);
tmp->key.len = size;
spinlock_lock(&sid_globals.spinlock);
if(!sid_globals.initialized) {
simple_hashtable_init_SID(&sid_globals.hashtable, 100);
sid_globals.initialized = true;
}
SID_VALUE *found = simple_hashtable_get_SID(&sid_globals.hashtable, &tmp->key, tmp_key_size);
spinlock_unlock(&sid_globals.spinlock);
if(found) return update_user(found, dst);
// allocate the SID_VALUE
found = mallocz(tmp_size);
memcpy(found, buf, tmp_size);
// lookup the user
lookup_user(sid, dst);
found->user = strdupz(dst->data);
found->user_len = dst->used - 1;
// add it to the cache
spinlock_lock(&sid_globals.spinlock);
simple_hashtable_set_SID(&sid_globals.hashtable, &found->key, tmp_key_size, found);
spinlock_unlock(&sid_globals.spinlock);
return update_user(found, dst);
}

View file

@ -0,0 +1,11 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WINDOWS_EVENTS_SID_H
#define NETDATA_WINDOWS_EVENTS_SID_H
#include "windows-events.h"
struct wevt_log;
bool wevt_convert_user_id_to_name(PSID sid, TXT_UTF8 *dst);
#endif //NETDATA_WINDOWS_EVENTS_SID_H

View file

@ -0,0 +1,267 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "windows-events-sources.h"
DICTIONARY *wevt_sources = NULL;
DICTIONARY *used_hashes_registry = NULL;
static usec_t wevt_session = 0;
WEVT_SOURCE_TYPE wevt_internal_source_type(const char *value) {
if(strcmp(value, WEVT_SOURCE_ALL_NAME) == 0)
return WEVTS_ALL;
return WEVTS_NONE;
}
void wevt_sources_del_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
LOGS_QUERY_SOURCE *src = value;
freez((void *)src->fullname);
string_freez(src->source);
src->fullname = NULL;
src->source = NULL;
}
static bool wevt_sources_conflict_cb(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *data __maybe_unused) {
LOGS_QUERY_SOURCE *src_old = old_value;
LOGS_QUERY_SOURCE *src_new = new_value;
bool ret = false;
if(src_new->last_scan_monotonic_ut > src_old->last_scan_monotonic_ut) {
src_old->last_scan_monotonic_ut = src_new->last_scan_monotonic_ut;
if (src_old->source != src_new->source) {
string_freez(src_old->source);
src_old->source = src_new->source;
src_new->source = NULL;
}
src_old->source_type = src_new->source_type;
src_old->msg_first_ut = src_new->msg_first_ut;
src_old->msg_last_ut = src_new->msg_last_ut;
src_old->msg_first_id = src_new->msg_first_id;
src_old->msg_last_id = src_new->msg_last_id;
src_old->entries = src_new->entries;
src_old->size = src_new->size;
ret = true;
}
freez((void *)src_new->fullname);
string_freez(src_new->source);
src_new->fullname = NULL;
src_new->source = NULL;
return ret;
}
void wevt_sources_init(void) {
wevt_session = now_realtime_usec();
used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
wevt_sources = dictionary_create_advanced(DICT_OPTION_FIXED_SIZE | DICT_OPTION_DONT_OVERWRITE_VALUE,
NULL, sizeof(LOGS_QUERY_SOURCE));
dictionary_register_delete_callback(wevt_sources, wevt_sources_del_cb, NULL);
dictionary_register_conflict_callback(wevt_sources, wevt_sources_conflict_cb, NULL);
}
void buffer_json_wevt_versions(BUFFER *wb __maybe_unused) {
buffer_json_member_add_object(wb, "versions");
{
buffer_json_member_add_uint64(wb, "sources",
wevt_session + dictionary_version(wevt_sources));
}
buffer_json_object_close(wb);
}
// --------------------------------------------------------------------------------------------------------------------
int wevt_sources_dict_items_backward_compar(const void *a, const void *b) {
const DICTIONARY_ITEM **da = (const DICTIONARY_ITEM **)a, **db = (const DICTIONARY_ITEM **)b;
LOGS_QUERY_SOURCE *sa = dictionary_acquired_item_value(*da);
LOGS_QUERY_SOURCE *sb = dictionary_acquired_item_value(*db);
// compare the last message timestamps
if(sa->msg_last_ut < sb->msg_last_ut)
return 1;
if(sa->msg_last_ut > sb->msg_last_ut)
return -1;
// compare the first message timestamps
if(sa->msg_first_ut < sb->msg_first_ut)
return 1;
if(sa->msg_first_ut > sb->msg_first_ut)
return -1;
return 0;
}
int wevt_sources_dict_items_forward_compar(const void *a, const void *b) {
return -wevt_sources_dict_items_backward_compar(a, b);
}
// --------------------------------------------------------------------------------------------------------------------
struct wevt_source {
usec_t first_ut;
usec_t last_ut;
size_t count;
uint64_t size;
};
static int wevt_source_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) {
const struct wevt_source *s = entry;
BUFFER *wb = data;
const char *name = dictionary_acquired_item_name(item);
buffer_json_add_array_item_object(wb);
{
char size_for_humans[128];
size_snprintf(size_for_humans, sizeof(size_for_humans), s->size, "B", false);
char duration_for_humans[128];
duration_snprintf(duration_for_humans, sizeof(duration_for_humans),
(time_t)((s->last_ut - s->first_ut) / USEC_PER_SEC), "s", true);
char info[1024];
snprintfz(info, sizeof(info), "%zu channels, with a total size of %s, covering %s",
s->count, size_for_humans, duration_for_humans);
buffer_json_member_add_string(wb, "id", name);
buffer_json_member_add_string(wb, "name", name);
buffer_json_member_add_string(wb, "pill", size_for_humans);
buffer_json_member_add_string(wb, "info", info);
}
buffer_json_object_close(wb); // options object
return 1;
}
static bool wevt_source_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) {
struct wevt_source *old_v = old_value;
const struct wevt_source *new_v = new_value;
old_v->count += new_v->count;
old_v->size += new_v->size;
if(new_v->first_ut && new_v->first_ut < old_v->first_ut)
old_v->first_ut = new_v->first_ut;
if(new_v->last_ut && new_v->last_ut > old_v->last_ut)
old_v->last_ut = new_v->last_ut;
return false;
}
void wevt_sources_to_json_array(BUFFER *wb) {
DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE);
dictionary_register_conflict_callback(dict, wevt_source_merge_sizes, NULL);
struct wevt_source t = { 0 };
LOGS_QUERY_SOURCE *src;
dfe_start_read(wevt_sources, src) {
t.first_ut = src->msg_first_ut;
t.last_ut = src->msg_last_ut;
t.count = 1;
t.size = src->size;
dictionary_set(dict, WEVT_SOURCE_ALL_NAME, &t, sizeof(t));
if(src->source)
dictionary_set(dict, string2str(src->source), &t, sizeof(t));
}
dfe_done(jf);
dictionary_sorted_walkthrough_read(dict, wevt_source_to_json_array_cb, wb);
}
void wevt_sources_scan(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
LPWSTR channel = NULL;
EVT_HANDLE hChannelEnum = NULL;
if(spinlock_trylock(&spinlock)) {
const usec_t now_monotonic_ut = now_monotonic_usec();
DWORD dwChannelBufferSize = 0;
DWORD dwChannelBufferUsed = 0;
DWORD status = ERROR_SUCCESS;
// Open a handle to enumerate the event channels
hChannelEnum = EvtOpenChannelEnum(NULL, 0);
if (!hChannelEnum) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "WINDOWS EVENTS: EvtOpenChannelEnum() failed with %" PRIu64 "\n",
(uint64_t)GetLastError());
goto cleanup;
}
WEVT_LOG *log = wevt_openlog6();
if(!log) goto cleanup;
while (true) {
if (!EvtNextChannelPath(hChannelEnum, dwChannelBufferSize, channel, &dwChannelBufferUsed)) {
status = GetLastError();
if (status == ERROR_NO_MORE_ITEMS)
break; // No more channels
else if (status == ERROR_INSUFFICIENT_BUFFER) {
dwChannelBufferSize = dwChannelBufferUsed;
freez(channel);
channel = mallocz(dwChannelBufferSize * sizeof(WCHAR));
continue;
} else {
nd_log(NDLS_COLLECTORS, NDLP_ERR,
"WINDOWS EVENTS: EvtNextChannelPath() failed\n");
break;
}
}
EVT_RETENTION retention;
if(!wevt_channel_retention(log, channel, &retention))
continue;
const char *name = channel2utf8(channel);
const char *fullname = strdupz(name);
char *slash = strchr(name, '/');
if(slash) *slash = '\0';
LOGS_QUERY_SOURCE src = {
.entries = retention.entries,
.fullname = fullname,
.fullname_len = strlen(fullname),
.last_scan_monotonic_ut = now_monotonic_usec(),
.msg_first_id = retention.first_event.id,
.msg_last_id = retention.last_event.id,
.msg_first_ut = retention.first_event.created_ns / NSEC_PER_USEC,
.msg_last_ut = retention.last_event.created_ns / NSEC_PER_USEC,
.size = retention.size_bytes,
.source_type = WEVTS_ALL,
.source = string_strdupz(name),
};
dictionary_set(wevt_sources, src.fullname, &src, sizeof(src));
}
wevt_closelog6(log);
LOGS_QUERY_SOURCE *src;
dfe_start_write(wevt_sources, src)
{
if(src->last_scan_monotonic_ut < now_monotonic_ut)
dictionary_del(wevt_sources, src->fullname);
}
dfe_done(src);
dictionary_garbage_collect(wevt_sources);
spinlock_unlock(&spinlock);
}
cleanup:
freez(channel);
EvtClose(hChannelEnum);
}

View file

@ -0,0 +1,45 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WINDOWS_EVENTS_SOURCES_H
#define NETDATA_WINDOWS_EVENTS_SOURCES_H
#include "windows-events.h"
typedef enum {
WEVTS_NONE = 0,
WEVTS_ALL = (1 << 0),
} WEVT_SOURCE_TYPE;
typedef struct {
const char *fullname;
size_t fullname_len;
STRING *source;
WEVT_SOURCE_TYPE source_type;
usec_t msg_first_ut;
usec_t msg_last_ut;
size_t size;
usec_t last_scan_monotonic_ut;
uint64_t msg_first_id;
uint64_t msg_last_id;
uint64_t entries;
} LOGS_QUERY_SOURCE;
extern DICTIONARY *wevt_sources;
extern DICTIONARY *used_hashes_registry;
#define WEVT_SOURCE_ALL_NAME "All"
void wevt_sources_init(void);
void wevt_sources_scan(void);
void buffer_json_wevt_versions(BUFFER *wb);
void wevt_sources_to_json_array(BUFFER *wb);
WEVT_SOURCE_TYPE wevt_internal_source_type(const char *value);
int wevt_sources_dict_items_backward_compar(const void *a, const void *b);
int wevt_sources_dict_items_forward_compar(const void *a, const void *b);
#endif //NETDATA_WINDOWS_EVENTS_SOURCES_H

View file

@ -0,0 +1,121 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "windows-events-unicode.h"
inline void utf82unicode(wchar_t *dst, size_t dst_size, const char *src) {
if (src) {
// Convert from UTF-8 to wide char (UTF-16)
if (MultiByteToWideChar(CP_UTF8, 0, src, -1, dst, (int)dst_size) == 0)
wcsncpy(dst, L"[failed conv.]", dst_size - 1);
}
else
wcsncpy(dst, L"[null]", dst_size - 1);
}
inline void unicode2utf8(char *dst, size_t dst_size, const wchar_t *src) {
if (src) {
if(WideCharToMultiByte(CP_UTF8, 0, src, -1, dst, (int)dst_size, NULL, NULL) == 0)
strncpyz(dst, "[failed conv.]", dst_size - 1);
}
else
strncpyz(dst, "[null]", dst_size - 1);
}
wchar_t *channel2unicode(const char *utf8str) {
static __thread wchar_t buffer[1024];
utf82unicode(buffer, sizeof(buffer) / sizeof(buffer[0]), utf8str);
return buffer;
}
char *channel2utf8(const wchar_t *channel) {
static __thread char buffer[1024];
unicode2utf8(buffer, sizeof(buffer), channel);
return buffer;
}
char *account2utf8(const wchar_t *user) {
static __thread char buffer[1024];
unicode2utf8(buffer, sizeof(buffer), user);
return buffer;
}
char *domain2utf8(const wchar_t *domain) {
static __thread char buffer[1024];
unicode2utf8(buffer, sizeof(buffer), domain);
return buffer;
}
char *query2utf8(const wchar_t *query) {
static __thread char buffer[16384];
unicode2utf8(buffer, sizeof(buffer), query);
return buffer;
}
bool wevt_str_wchar_to_utf8(TXT_UTF8 *utf8, const wchar_t *src, int src_len_with_null) {
if(!src || !src_len_with_null)
goto cleanup;
// make sure the input is null terminated at the exact point we need it
// (otherwise, the output will not be null terminated either)
fatal_assert(src_len_with_null == -1 || (src_len_with_null >= 1 && src[src_len_with_null - 1] == 0));
// Try to convert using the existing buffer (if it exists, otherwise get the required buffer size)
int size = WideCharToMultiByte(CP_UTF8, 0, src, src_len_with_null, utf8->data, (int)utf8->size, NULL, NULL);
if(size <= 0 || !utf8->data) {
// we have to set a buffer, or increase it
if(utf8->data) {
// we need to increase it the buffer size
if(GetLastError() != ERROR_INSUFFICIENT_BUFFER) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "WideCharToMultiByte() failed.");
goto cleanup;
}
// we have to find the required buffer size
size = WideCharToMultiByte(CP_UTF8, 0, src, src_len_with_null, NULL, 0, NULL, NULL);
if(size <= 0)
goto cleanup;
}
// Retry conversion with the new buffer
txt_utf8_resize(utf8, size);
size = WideCharToMultiByte(CP_UTF8, 0, src, src_len_with_null, utf8->data, (int)utf8->size, NULL, NULL);
if (size <= 0) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "WideCharToMultiByte() failed after resizing.");
goto cleanup;
}
}
// Make sure it is not zero padded at the end
while(size >= 2 && utf8->data[size - 2] == 0)
size--;
utf8->used = (size_t)size;
internal_fatal(strlen(utf8->data) + 1 != utf8->used,
"Wrong UTF8 string length");
return true;
cleanup:
txt_utf8_resize(utf8, 128);
if(src)
utf8->used = snprintfz(utf8->data, utf8->size, "[failed conv.]") + 1;
else {
utf8->data[0] = '\0';
utf8->used = 1;
}
return false;
}
bool wevt_str_unicode_to_utf8(TXT_UTF8 *utf8, TXT_UNICODE *unicode) {
fatal_assert(utf8 && ((utf8->data && utf8->size) || (!utf8->data && !utf8->size)));
fatal_assert(unicode && ((unicode->data && unicode->size) || (!unicode->data && !unicode->size)));
// pass the entire unicode size, including the null terminator
// so that the resulting utf8 message will be null terminated too.
return wevt_str_wchar_to_utf8(utf8, unicode->data, (int)unicode->used);
}

View file

@ -0,0 +1,72 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WINDOWS_EVENTS_UNICODE_H
#define NETDATA_WINDOWS_EVENTS_UNICODE_H
#include "libnetdata/libnetdata.h"
#include <windows.h>
#include <wchar.h>
typedef struct {
char *data;
size_t size; // the allocated size of data buffer
size_t used; // the used size of the data buffer (including null terminators, if any)
} TXT_UTF8;
typedef struct {
wchar_t *data;
size_t size; // the allocated size of data buffer
size_t used; // the used size of the data buffer (including null terminators, if any)
} TXT_UNICODE;
static inline size_t compute_new_size(size_t old_size, size_t required_size) {
size_t size = (required_size % 2048 == 0) ? required_size : required_size + 2048;
size = (size / 2048) * 2048;
if(size < old_size * 2)
size = old_size * 2;
return size;
}
static inline void txt_utf8_cleanup(TXT_UTF8 *utf8) {
freez(utf8->data);
}
static inline void txt_utf8_resize(TXT_UTF8 *utf8, size_t required_size) {
if(required_size < utf8->size)
return;
txt_utf8_cleanup(utf8);
utf8->size = compute_new_size(utf8->size, required_size);
utf8->data = mallocz(utf8->size);
}
static inline void txt_unicode_cleanup(TXT_UNICODE *unicode) {
freez(unicode->data);
}
static inline void txt_unicode_resize(TXT_UNICODE *unicode, size_t required_size) {
if(required_size < unicode->size)
return;
txt_unicode_cleanup(unicode);
unicode->size = compute_new_size(unicode->size, required_size);
unicode->data = mallocz(unicode->size * sizeof(wchar_t));
}
bool wevt_str_unicode_to_utf8(TXT_UTF8 *utf8, TXT_UNICODE *unicode);
bool wevt_str_wchar_to_utf8(TXT_UTF8 *utf8, const wchar_t *src, int src_len_with_null);
void unicode2utf8(char *dst, size_t dst_size, const wchar_t *src);
void utf82unicode(wchar_t *dst, size_t dst_size, const char *src);
char *account2utf8(const wchar_t *user);
char *domain2utf8(const wchar_t *domain);
char *channel2utf8(const wchar_t *channel);
wchar_t *channel2unicode(const char *utf8str);
char *query2utf8(const wchar_t *query);
#endif //NETDATA_WINDOWS_EVENTS_UNICODE_H

View file

@ -0,0 +1,211 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "windows-events-xml.h"
#include <string.h>
#include <stdio.h>
#define INDENT_STEP 2
#define A_LOT_OF_SPACES " "
// Helper: Add indentation
static inline void buffer_add_xml_indent(BUFFER *buffer, const int level) {
size_t total_spaces = (size_t)level * INDENT_STEP;
const size_t step = sizeof(A_LOT_OF_SPACES) - 1;
while (total_spaces > 0) {
const size_t spaces_to_add = (total_spaces > step) ? step : total_spaces;
buffer_fast_strcat(buffer, A_LOT_OF_SPACES, spaces_to_add);
total_spaces -= spaces_to_add;
}
}
const char *append_the_rest(BUFFER *buffer, const char *xml, const char *end) {
if(xml >= end) return end;
buffer_fast_strcat(buffer, xml, end - xml);
return end;
}
static const char *parse_node(BUFFER *buffer, const char *xml, const char *end, int level);
// Helper: Parse the value (between > and <) and return the next position to parse
const char *parse_value_and_closing_tag(BUFFER *buffer, const char *xml, const char *end, int level) {
const char *start = xml;
bool has_subnodes = false;
while (xml < end) {
if(*xml == '<') {
if(xml + 1 < end && *(xml + 1) == '/') {
// a closing tag
xml += 2;
while(xml < end && *xml != '>')
xml++;
if(xml < end && *xml == '>')
xml++;
if(has_subnodes) {
buffer_putc(buffer, '\n');
buffer_add_xml_indent(buffer, level);
}
buffer_fast_strcat(buffer, start, xml - start);
return xml;
}
else {
// an opening tag
buffer_fast_strcat(buffer, start, xml - start);
xml = start = parse_node(buffer, xml, end, level + 1);
while(xml < end && isspace(*xml))
xml++;
has_subnodes = true;
}
}
else
xml++;
}
return append_the_rest(buffer, start, end);
}
// Parse a field value and return the next position to parse
const char *parse_field_value(BUFFER *buffer, const char *xml, const char *end) {
const char quote = *xml;
if(quote != '"' && quote != '\'')
return append_the_rest(buffer, xml, end);
const char *start = xml++;
while (xml < end && *xml != quote) {
if (*xml == '\\') {
xml++; // Skip escape character
if(xml < end)
xml++;
continue;
}
xml++;
}
if(xml < end && *xml == quote) {
xml++; // Move past the closing quote
buffer_fast_strcat(buffer, start, xml - start);
return xml;
}
return append_the_rest(buffer, start, end);
}
// Parse a field name and return the next position to parse
const char *parse_field(BUFFER *buffer, const char *xml, const char *end) {
while(isspace(*xml) && xml < end) xml++;
const char *start = xml;
while (*xml != '=' && xml < end)
xml++;
// Append the field name
buffer_fast_strcat(buffer, start, xml - start);
if(xml < end && *xml == '=') {
xml++;
buffer_putc(buffer, '=');
if(xml < end && (*xml == '"' || *xml == '\''))
xml = parse_field_value(buffer, xml, end);
return xml; // Return the next character to parse
}
return append_the_rest(buffer, start, end);
}
// Parse a node (handles fields and subnodes) and return the next position to parse
static inline const char *parse_node(BUFFER *buffer, const char *xml, const char *end, int level) {
if(*xml != '<')
return append_the_rest(buffer, xml, end);
const char *start = xml++; // skip the <
buffer_putc(buffer, '\n');
buffer_add_xml_indent(buffer, level);
// skip spaces before the tag name
while(xml < end && isspace(*xml)) xml++;
// Parse the tag name
while (xml < end && *xml != '>' && *xml != '/') {
xml++;
if(xml < end && isspace(*xml)) {
xml++;
while(xml < end && isspace(*xml))
xml++;
if(xml < end && *xml == '/') {
// an opening tag that is self-closing
xml++;
if(xml < end && *xml == '>') {
xml++;
buffer_fast_strcat(buffer, start, xml - start);
return xml;
}
else
return append_the_rest(buffer, start, end);
}
else if(xml < end && *xml == '>') {
// the end of an opening tag
xml++;
buffer_fast_strcat(buffer, start, xml - start);
return parse_value_and_closing_tag(buffer, xml, end, level);
}
else {
buffer_fast_strcat(buffer, start, xml - start);
xml = start = parse_field(buffer, xml, end);
while(xml < end && isspace(*xml))
xml++;
}
}
}
bool self_closing_tag = false;
if(xml < end && *xml == '/') {
self_closing_tag = true;
xml++;
}
if(xml < end && *xml == '>') {
xml++;
buffer_fast_strcat(buffer, start, xml - start);
if(self_closing_tag)
return xml;
return parse_value_and_closing_tag(buffer, xml, end, level);
}
return append_the_rest(buffer, start, end);
}
// Main pretty-print XML function
void buffer_pretty_print_xml(BUFFER *buffer, const char *xml, size_t xml_len) {
const char *end = xml + xml_len;
while(xml < end) {
while(xml < end && isspace(*xml))
xml++;
if(xml < end && *xml == '<')
xml = parse_node(buffer, xml, end, 1);
else {
append_the_rest(buffer, xml, end);
return;
}
}
}

View file

@ -0,0 +1,10 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef WINDOWS_EVENTS_XML_H
#define WINDOWS_EVENTS_XML_H
#include "libnetdata/libnetdata.h"
void buffer_pretty_print_xml(BUFFER *buffer, const char *xml, size_t xml_len);
#endif //WINDOWS_EVENTS_XML_H

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,28 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WINDOWS_EVENTS_H
#define NETDATA_WINDOWS_EVENTS_H
#include "libnetdata/libnetdata.h"
#include "collectors/all.h"
#include <windows.h>
#include <winevt.h>
#include <wchar.h>
typedef enum {
WEVT_NO_CHANNEL_MATCHED,
WEVT_FAILED_TO_OPEN,
WEVT_FAILED_TO_SEEK,
WEVT_TIMED_OUT,
WEVT_OK,
WEVT_NOT_MODIFIED,
WEVT_CANCELLED,
} WEVT_QUERY_STATUS;
#include "windows-events-unicode.h"
#include "windows-events-query.h"
#include "windows-events-sources.h"
#include "windows-events-sid.h"
#include "windows-events-xml.h"
#endif //NETDATA_WINDOWS_EVENTS_H

View file

@ -247,7 +247,7 @@ void buffer_free(BUFFER *b) {
buffer_overflow_check(b);
netdata_log_debug(D_WEB_BUFFER, "Freeing web buffer of size %zu.", b->size);
netdata_log_debug(D_WEB_BUFFER, "Freeing web buffer of size %zu.", (size_t)b->size);
if(b->statistics)
__atomic_sub_fetch(b->statistics, b->size + sizeof(BUFFER) + sizeof(BUFFER_OVERFLOW_EOF) + 2, __ATOMIC_RELAXED);
@ -269,7 +269,7 @@ void buffer_increase(BUFFER *b, size_t free_size_required) {
size_t optimal = (b->size > 5*1024*1024) ? b->size / 2 : b->size;
if(optimal > increase) increase = optimal;
netdata_log_debug(D_WEB_BUFFER, "Increasing data buffer from size %zu to %zu.", b->size, b->size + increase);
netdata_log_debug(D_WEB_BUFFER, "Increasing data buffer from size %zu to %zu.", (size_t)b->size, (size_t)(b->size + increase));
b->buffer = reallocz(b->buffer, b->size + increase + sizeof(BUFFER_OVERFLOW_EOF) + 2);
b->size += increase;

View file

@ -39,14 +39,15 @@ typedef enum __attribute__ ((__packed__)) {
} BUFFER_JSON_OPTIONS;
typedef struct web_buffer {
size_t size; // allocation size of buffer, in bytes
size_t len; // current data length in buffer, in bytes
char *buffer; // the buffer itself
uint32_t size; // allocation size of buffer, in bytes
uint32_t len; // current data length in buffer, in bytes
HTTP_CONTENT_TYPE content_type; // the content type of the data in the buffer
BUFFER_OPTIONS options; // options related to the content
uint16_t response_code;
time_t date; // the timestamp this content has been generated
time_t expires; // the timestamp this content expires
size_t *statistics;
char *buffer; // the buffer itself
struct {
char key_quote[BUFFER_QUOTE_MAX_SIZE + 1];
@ -62,7 +63,7 @@ typedef struct web_buffer {
#define buffer_cacheable(wb) do { (wb)->options |= WB_CONTENT_CACHEABLE; if((wb)->options & WB_CONTENT_NO_CACHEABLE) (wb)->options &= ~WB_CONTENT_NO_CACHEABLE; } while(0)
#define buffer_no_cacheable(wb) do { (wb)->options |= WB_CONTENT_NO_CACHEABLE; if((wb)->options & WB_CONTENT_CACHEABLE) (wb)->options &= ~WB_CONTENT_CACHEABLE; (wb)->expires = 0; } while(0)
#define buffer_strlen(wb) ((wb)->len)
#define buffer_strlen(wb) (size_t)((wb)->len)
#define BUFFER_OVERFLOW_EOF "EOF"
@ -488,42 +489,54 @@ static inline int print_netdata_double(char *dst, NETDATA_DOUBLE value) {
return (int)(d - dst);
}
static inline void buffer_print_uint64(BUFFER *wb, uint64_t value) {
buffer_need_bytes(wb, 50);
char *s = &wb->buffer[wb->len];
static inline size_t print_uint64(char *dst, uint64_t value) {
char *s = dst;
char *d = print_uint64_reversed(s, value);
char_array_reverse(s, d - 1);
*d = '\0';
wb->len += d - s;
return d - s;
}
static inline size_t print_int64(char *dst, int64_t value) {
size_t len = 0;
if(value < 0) {
*dst++ = '-';
value = -value;
len++;
}
return print_uint64(dst, value) + len;
}
static inline void buffer_print_uint64(BUFFER *wb, uint64_t value) {
buffer_need_bytes(wb, 50);
wb->len += print_uint64(&wb->buffer[wb->len], value);
buffer_overflow_check(wb);
}
static inline void buffer_print_int64(BUFFER *wb, int64_t value) {
buffer_need_bytes(wb, 50);
if(value < 0) {
buffer_fast_strcat(wb, "-", 1);
value = -value;
}
buffer_print_uint64(wb, (uint64_t)value);
wb->len += print_int64(&wb->buffer[wb->len], value);
buffer_overflow_check(wb);
}
#define UINT64_HEX_LENGTH ((sizeof(HEX_PREFIX) - 1) + (sizeof(uint64_t) * 2) + 2 + 1)
static inline size_t print_uint64_hex(char *dst, uint64_t value) {
char *d = dst;
const char *s = HEX_PREFIX;
while(*s) *d++ = *s++;
char *e = print_uint64_hex_reversed(d, value);
char_array_reverse(d, e - 1);
*e = '\0';
return e - dst;
}
static inline void buffer_print_uint64_hex(BUFFER *wb, uint64_t value) {
buffer_need_bytes(wb, sizeof(uint64_t) * 2 + 2 + 1);
buffer_fast_strcat(wb, HEX_PREFIX, sizeof(HEX_PREFIX) - 1);
char *s = &wb->buffer[wb->len];
char *d = print_uint64_hex_reversed(s, value);
char_array_reverse(s, d - 1);
*d = '\0';
wb->len += d - s;
buffer_need_bytes(wb, UINT64_HEX_LENGTH);
wb->len += print_uint64_hex(&wb->buffer[wb->len], value);
buffer_overflow_check(wb);
}

View file

@ -1618,6 +1618,8 @@ FACETS *facets_create(uint32_t items_to_return, FACETS_OPTIONS options, const ch
}
void facets_destroy(FACETS *facets) {
if(!facets) return;
dictionary_destroy(facets->accepted_params);
FACETS_KEYS_INDEX_DESTROY(facets);
simple_pattern_free(facets->visible_keys);
@ -1879,6 +1881,10 @@ void facets_add_key_value(FACETS *facets, const char *key, const char *value) {
}
void facets_add_key_value_length(FACETS *facets, const char *key, size_t key_len, const char *value, size_t value_len) {
if(!key || !*key || !key_len || !value || !*value || !value_len)
// adding empty values, makes the rows unmatched
return;
FACET_KEY *k = facets_register_key_name_length(facets, key, key_len, 0);
k->current_value.raw = value;
k->current_value.raw_len = value_len;
@ -2142,6 +2148,9 @@ void facets_rows_begin(FACETS *facets) {
}
bool facets_row_finished(FACETS *facets, usec_t usec) {
// char buf[RFC3339_MAX_LENGTH];
// rfc3339_datetime_ut(buf, sizeof(buf), usec, 3, false);
facets->operations.rows.evaluated++;
if(unlikely((facets->query && facets->keys_filtered_by_query &&

View file

@ -0,0 +1,850 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_LOGS_QUERY_STATUS_H
#define NETDATA_LOGS_QUERY_STATUS_H
#include "../libnetdata.h"
#define LQS_PARAMETER_HELP "help"
#define LQS_PARAMETER_AFTER "after"
#define LQS_PARAMETER_BEFORE "before"
#define LQS_PARAMETER_ANCHOR "anchor"
#define LQS_PARAMETER_LAST "last"
#define LQS_PARAMETER_QUERY "query"
#define LQS_PARAMETER_FACETS "facets"
#define LQS_PARAMETER_HISTOGRAM "histogram"
#define LQS_PARAMETER_DIRECTION "direction"
#define LQS_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
#define LQS_PARAMETER_DATA_ONLY "data_only"
#define LQS_PARAMETER_SOURCE "source"
#define LQS_PARAMETER_INFO "info"
#define LQS_PARAMETER_SLICE "slice"
#define LQS_PARAMETER_DELTA "delta"
#define LQS_PARAMETER_TAIL "tail"
#define LQS_PARAMETER_SAMPLING "sampling"
#define LQS_MAX_PARAMS 1000
#define LQS_DEFAULT_QUERY_DURATION (1 * 3600)
#undef LQS_SLICE_PARAMETER
#if LQS_DEFAULT_SLICE_MODE == 1
#define LQS_SLICE_PARAMETER 1
#endif
typedef struct {
const char *transaction;
FACET_KEY_OPTIONS default_facet; // the option to be used for internal fields.
// when the requests set facets, we disable all default facets,
// so that the UI has full control over them.
bool fields_are_ids; // POST works with field names, GET works with field hashes (IDs)
bool info; // the request is an INFO request, do not execute a query.
bool data_only; // return as fast as possible, with the requested amount of data,
// without scanning the entire duration.
bool slice; // apply native backend filters to slice the events database.
bool delta; // return incremental data for the histogram (used with data_only)
bool tail; // return NOT MODIFIED if no more data are available after the anchor given.
time_t after_s; // the starting timestamp of the query
time_t before_s; // the ending timestamp of the query
usec_t after_ut; // in microseconds
usec_t before_ut; // in microseconds
usec_t anchor; // the anchor to seek to
FACETS_ANCHOR_DIRECTION direction; // the direction based on the anchor (or the query timeframe)
usec_t if_modified_since; // the timestamp to check with tail == true
size_t entries; // the number of log events to return in a single response
const char *query; // full text search query string
const char *histogram; // the field to use for the histogram
SIMPLE_PATTERN *sources; // custom log sources to query
LQS_SOURCE_TYPE source_type; // pre-defined log sources to query
size_t filters; // the number of filters (facets selected) in the query
size_t sampling; // the number of log events to sample, when the query is too big
time_t now_s; // the timestamp the query was received
time_t expires_s; // the timestamp the response expires
} LOGS_QUERY_REQUEST;
#define LOGS_QUERY_REQUEST_DEFAULTS(function_transaction, default_slice, default_direction) \
(LOGS_QUERY_REQUEST) { \
.transaction = (function_transaction), \
.default_facet = FACET_KEY_OPTION_FACET, \
.info = false, \
.data_only = false, \
.slice = (default_slice), \
.delta = false, \
.tail = false, \
.after_s = 0, \
.before_s = 0, \
.anchor = 0, \
.if_modified_since = 0, \
.entries = 0, \
.direction = (default_direction), \
.query = NULL, \
.histogram = NULL, \
.sources = NULL, \
.source_type = LQS_SOURCE_TYPE_ALL, \
.filters = 0, \
.sampling = LQS_DEFAULT_ITEMS_SAMPLING, \
}
typedef struct {
FACETS *facets;
LOGS_QUERY_REQUEST rq;
bool *cancelled; // a pointer to the cancelling boolean
usec_t *stop_monotonic_ut;
struct {
usec_t start_ut;
usec_t stop_ut;
} anchor;
usec_t last_modified;
struct lqs_extension c;
} LOGS_QUERY_STATUS;
struct logs_query_data {
const char *transaction;
FACETS *facets;
LOGS_QUERY_REQUEST *rq;
BUFFER *wb;
};
static inline FACETS_ANCHOR_DIRECTION lgs_get_direction(const char *value) {
return strcasecmp(value, "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD;
}
static inline void lqs_log_error(LOGS_QUERY_STATUS *lqs, const char *msg) {
nd_log(NDLS_COLLECTORS, NDLP_ERR,
"LOGS QUERY ERROR: %s, on query "
"timeframe [%"PRIu64" - %"PRIu64"], "
"anchor [%"PRIu64" - %"PRIu64"], "
"if_modified_since %"PRIu64", "
"data_only:%s, delta:%s, tail:%s, direction:%s"
, msg
, lqs->rq.after_ut
, lqs->rq.before_ut
, lqs->anchor.start_ut
, lqs->anchor.stop_ut
, lqs->rq.if_modified_since
, lqs->rq.data_only ? "true" : "false"
, lqs->rq.delta ? "true" : "false"
, lqs->rq.tail ? "tail" : "false"
, lqs->rq.direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
}
static inline void lqs_function_help(LOGS_QUERY_STATUS *lqs, BUFFER *wb) {
buffer_reset(wb);
wb->content_type = CT_TEXT_PLAIN;
wb->response_code = HTTP_RESP_OK;
buffer_sprintf(wb,
"%s / %s\n"
"\n"
"%s\n"
"\n"
"The following parameters are supported:\n"
"\n"
, program_name
, LQS_FUNCTION_NAME
, LQS_FUNCTION_DESCRIPTION
);
buffer_sprintf(wb,
" " LQS_PARAMETER_HELP "\n"
" Shows this help message.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_INFO "\n"
" Request initial configuration information about the plugin.\n"
" The key entity returned is the required_params array, which includes\n"
" all the available log sources.\n"
" When `" LQS_PARAMETER_INFO "` is requested, all other parameters are ignored.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_DATA_ONLY ":true or " LQS_PARAMETER_DATA_ONLY ":false\n"
" Quickly respond with data requested, without generating a\n"
" `histogram`, `facets` counters and `items`.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_DELTA ":true or " LQS_PARAMETER_DELTA ":false\n"
" When doing data only queries, include deltas for histogram, facets and items.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_TAIL ":true or " LQS_PARAMETER_TAIL ":false\n"
" When doing data only queries, respond with the newest messages,\n"
" and up to the anchor, but calculate deltas (if requested) for\n"
" the duration [anchor - before].\n"
"\n"
);
#ifdef LQS_SLICE_PARAMETER
buffer_sprintf(wb,
" " LQS_PARAMETER_SLICE ":true or " LQS_PARAMETER_SLICE ":false\n"
" When it is turned on, the plugin is is slicing the logs database,\n"
" utilizing the underlying available indexes.\n"
" When it is off, all filtering is done by the plugin.\n"
" The default is: %s\n"
"\n"
, lqs->rq.slice ? "true" : "false"
);
#endif
buffer_sprintf(wb,
" " LQS_PARAMETER_SOURCE ":SOURCE\n"
" Query only the specified log sources.\n"
" Do an `" LQS_PARAMETER_INFO "` query to find the sources.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_BEFORE ":TIMESTAMP_IN_SECONDS\n"
" Absolute or relative (to now) timestamp in seconds, to start the query.\n"
" The query is always executed from the most recent to the oldest log entry.\n"
" If not given the default is: now.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_AFTER ":TIMESTAMP_IN_SECONDS\n"
" Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
" If not given, the default is %d.\n"
"\n"
, -LQS_DEFAULT_QUERY_DURATION
);
buffer_sprintf(wb,
" " LQS_PARAMETER_LAST ":ITEMS\n"
" The number of items to return.\n"
" The default is %zu.\n"
"\n"
, lqs->rq.entries
);
buffer_sprintf(wb,
" " LQS_PARAMETER_SAMPLING ":ITEMS\n"
" The number of log entries to sample to estimate facets counters and histogram.\n"
" The default is %zu.\n"
"\n"
, lqs->rq.sampling
);
buffer_sprintf(wb,
" " LQS_PARAMETER_ANCHOR ":TIMESTAMP_IN_MICROSECONDS\n"
" Return items relative to this timestamp.\n"
" The exact items to be returned depend on the query `" LQS_PARAMETER_DIRECTION "`.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_DIRECTION ":forward or " LQS_PARAMETER_DIRECTION ":backward\n"
" When set to `backward` (default) the items returned are the newest before the\n"
" `" LQS_PARAMETER_ANCHOR "`, (or `" LQS_PARAMETER_BEFORE "` if `" LQS_PARAMETER_ANCHOR "` is not set)\n"
" When set to `forward` the items returned are the oldest after the\n"
" `" LQS_PARAMETER_ANCHOR "`, (or `" LQS_PARAMETER_AFTER "` if `" LQS_PARAMETER_ANCHOR "` is not set)\n"
" The default is: %s\n"
"\n"
, lqs->rq.direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_QUERY ":SIMPLE_PATTERN\n"
" Do a full text search to find the log entries matching the pattern given.\n"
" The plugin is searching for matches on all fields of the database.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_IF_MODIFIED_SINCE ":TIMESTAMP_IN_MICROSECONDS\n"
" Each successful response, includes a `last_modified` field.\n"
" By providing the timestamp to the `" LQS_PARAMETER_IF_MODIFIED_SINCE "` parameter,\n"
" the plugin will return 200 with a successful response, or 304 if the source has not\n"
" been modified since that timestamp.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_HISTOGRAM ":facet_id\n"
" Use the given `facet_id` for the histogram.\n"
" This parameter is ignored in `" LQS_PARAMETER_DATA_ONLY "` mode.\n"
"\n"
);
buffer_sprintf(wb,
" " LQS_PARAMETER_FACETS ":facet_id1,facet_id2,facet_id3,...\n"
" Add the given facets to the list of fields for which analysis is required.\n"
" The plugin will offer both a histogram and facet value counters for its values.\n"
" This parameter is ignored in `" LQS_PARAMETER_DATA_ONLY "` mode.\n"
"\n"
);
buffer_sprintf(wb,
" facet_id:value_id1,value_id2,value_id3,...\n"
" Apply filters to the query, based on the facet IDs returned.\n"
" Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
"\n"
);
}
static inline bool lqs_request_parse_json_payload(json_object *jobj, const char *path, void *data, BUFFER *error) {
struct logs_query_data *qd = data;
LOGS_QUERY_REQUEST *rq = qd->rq;
BUFFER *wb = qd->wb;
FACETS *facets = qd->facets;
// const char *transaction = qd->transaction;
buffer_flush(error);
JSONC_PARSE_BOOL_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_INFO, rq->info, error, false);
JSONC_PARSE_BOOL_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_DELTA, rq->delta, error, false);
JSONC_PARSE_BOOL_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_TAIL, rq->tail, error, false);
JSONC_PARSE_BOOL_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_SLICE, rq->slice, error, false);
JSONC_PARSE_BOOL_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_DATA_ONLY, rq->data_only, error, false);
JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_SAMPLING, rq->sampling, error, false);
JSONC_PARSE_INT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_AFTER, rq->after_s, error, false);
JSONC_PARSE_INT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_BEFORE, rq->before_s, error, false);
JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_IF_MODIFIED_SINCE, rq->if_modified_since, error, false);
JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_ANCHOR, rq->anchor, error, false);
JSONC_PARSE_UINT64_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_LAST, rq->entries, error, false);
JSONC_PARSE_TXT2ENUM_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_DIRECTION, lgs_get_direction, rq->direction, error, false);
JSONC_PARSE_TXT2STRDUPZ_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_QUERY, rq->query, error, false);
JSONC_PARSE_TXT2STRDUPZ_OR_ERROR_AND_RETURN(jobj, path, LQS_PARAMETER_HISTOGRAM, rq->histogram, error, false);
json_object *sources;
if (json_object_object_get_ex(jobj, LQS_PARAMETER_SOURCE, &sources)) {
if (json_object_get_type(sources) != json_type_array) {
buffer_sprintf(error, "member '%s' is not an array", LQS_PARAMETER_SOURCE);
return false;
}
buffer_json_member_add_array(wb, LQS_PARAMETER_SOURCE);
CLEAN_BUFFER *sources_list = buffer_create(0, NULL);
rq->source_type = LQS_SOURCE_TYPE_NONE;
size_t sources_len = json_object_array_length(sources);
for (size_t i = 0; i < sources_len; i++) {
json_object *src = json_object_array_get_idx(sources, i);
if (json_object_get_type(src) != json_type_string) {
buffer_sprintf(error, "sources array item %zu is not a string", i);
return false;
}
const char *value = json_object_get_string(src);
buffer_json_add_array_item_string(wb, value);
LQS_SOURCE_TYPE t = LQS_FUNCTION_GET_INTERNAL_SOURCE_TYPE(value);
if(t != LQS_SOURCE_TYPE_NONE) {
rq->source_type |= t;
value = NULL;
}
else {
// else, match the source, whatever it is
if(buffer_strlen(sources_list))
buffer_putc(sources_list, '|');
buffer_strcat(sources_list, value);
}
}
if(buffer_strlen(sources_list)) {
simple_pattern_free(rq->sources);
rq->sources = simple_pattern_create(buffer_tostring(sources_list), "|", SIMPLE_PATTERN_EXACT, false);
}
buffer_json_array_close(wb); // source
}
json_object *fcts;
if (json_object_object_get_ex(jobj, LQS_PARAMETER_FACETS, &fcts)) {
if (json_object_get_type(sources) != json_type_array) {
buffer_sprintf(error, "member '%s' is not an array", LQS_PARAMETER_FACETS);
return false;
}
rq->default_facet = FACET_KEY_OPTION_NONE;
facets_reset_and_disable_all_facets(facets);
buffer_json_member_add_array(wb, LQS_PARAMETER_FACETS);
size_t facets_len = json_object_array_length(fcts);
for (size_t i = 0; i < facets_len; i++) {
json_object *fct = json_object_array_get_idx(fcts, i);
if (json_object_get_type(fct) != json_type_string) {
buffer_sprintf(error, "facets array item %zu is not a string", i);
return false;
}
const char *value = json_object_get_string(fct);
facets_register_facet(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
buffer_json_add_array_item_string(wb, value);
}
buffer_json_array_close(wb); // facets
}
json_object *selections;
if (json_object_object_get_ex(jobj, "selections", &selections)) {
if (json_object_get_type(selections) != json_type_object) {
buffer_sprintf(error, "member 'selections' is not an object");
return false;
}
buffer_json_member_add_object(wb, "selections");
json_object_object_foreach(selections, key, val) {
if (json_object_get_type(val) != json_type_array) {
buffer_sprintf(error, "selection '%s' is not an array", key);
return false;
}
buffer_json_member_add_array(wb, key);
size_t values_len = json_object_array_length(val);
for (size_t i = 0; i < values_len; i++) {
json_object *value_obj = json_object_array_get_idx(val, i);
if (json_object_get_type(value_obj) != json_type_string) {
buffer_sprintf(error, "selection '%s' array item %zu is not a string", key, i);
return false;
}
const char *value = json_object_get_string(value_obj);
// Call facets_register_facet_id_filter for each value
facets_register_facet_filter(
facets, key, value, FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_REORDER);
buffer_json_add_array_item_string(wb, value);
rq->filters++;
}
buffer_json_array_close(wb); // key
}
buffer_json_object_close(wb); // selections
}
rq->fields_are_ids = false;
return true;
}
static inline bool lqs_request_parse_POST(LOGS_QUERY_STATUS *lqs, BUFFER *wb, BUFFER *payload, const char *transaction) {
FACETS *facets = lqs->facets;
LOGS_QUERY_REQUEST *rq = &lqs->rq;
struct logs_query_data qd = {
.transaction = transaction,
.facets = facets,
.rq = rq,
.wb = wb,
};
int code;
CLEAN_JSON_OBJECT *jobj =
json_parse_function_payload_or_error(wb, payload, &code, lqs_request_parse_json_payload, &qd);
wb->response_code = code;
return (jobj && code == HTTP_RESP_OK);
}
static inline bool lqs_request_parse_GET(LOGS_QUERY_STATUS *lqs, BUFFER *wb, char *function) {
FACETS *facets = lqs->facets;
LOGS_QUERY_REQUEST *rq = &lqs->rq;
buffer_json_member_add_object(wb, "_request");
char func_copy[strlen(function) + 1];
strcpy(func_copy, function);
char *words[LQS_MAX_PARAMS] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(func_copy, words, LQS_MAX_PARAMS);
for(int i = 1; i < LQS_MAX_PARAMS;i++) {
char *keyword = get_word(words, num_words, i);
if(!keyword) break;
if(strcmp(keyword, LQS_PARAMETER_HELP) == 0) {
lqs_function_help(lqs, wb);
return false;
}
else if(strcmp(keyword, LQS_PARAMETER_INFO) == 0) {
rq->info = true;
}
else if(strncmp(keyword, LQS_PARAMETER_DELTA ":", sizeof(LQS_PARAMETER_DELTA ":") - 1) == 0) {
char *v = &keyword[sizeof(LQS_PARAMETER_DELTA ":") - 1];
if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
rq->delta = false;
else
rq->delta = true;
}
else if(strncmp(keyword, LQS_PARAMETER_TAIL ":", sizeof(LQS_PARAMETER_TAIL ":") - 1) == 0) {
char *v = &keyword[sizeof(LQS_PARAMETER_TAIL ":") - 1];
if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
rq->tail = false;
else
rq->tail = true;
}
else if(strncmp(keyword, LQS_PARAMETER_SAMPLING ":", sizeof(LQS_PARAMETER_SAMPLING ":") - 1) == 0) {
rq->sampling = str2ul(&keyword[sizeof(LQS_PARAMETER_SAMPLING ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_DATA_ONLY ":", sizeof(LQS_PARAMETER_DATA_ONLY ":") - 1) == 0) {
char *v = &keyword[sizeof(LQS_PARAMETER_DATA_ONLY ":") - 1];
if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
rq->data_only = false;
else
rq->data_only = true;
}
else if(strncmp(keyword, LQS_PARAMETER_SLICE ":", sizeof(LQS_PARAMETER_SLICE ":") - 1) == 0) {
char *v = &keyword[sizeof(LQS_PARAMETER_SLICE ":") - 1];
if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
rq->slice = false;
else
rq->slice = true;
}
else if(strncmp(keyword, LQS_PARAMETER_SOURCE ":", sizeof(LQS_PARAMETER_SOURCE ":") - 1) == 0) {
const char *value = &keyword[sizeof(LQS_PARAMETER_SOURCE ":") - 1];
buffer_json_member_add_array(wb, LQS_PARAMETER_SOURCE);
CLEAN_BUFFER *sources_list = buffer_create(0, NULL);
rq->source_type = LQS_SOURCE_TYPE_NONE;
while(value) {
char *sep = strchr(value, ',');
if(sep)
*sep++ = '\0';
buffer_json_add_array_item_string(wb, value);
LQS_SOURCE_TYPE t = LQS_FUNCTION_GET_INTERNAL_SOURCE_TYPE(value);
if(t != LQS_SOURCE_TYPE_NONE) {
rq->source_type |= t;
value = NULL;
}
else {
// else, match the source, whatever it is
if(buffer_strlen(sources_list))
buffer_putc(sources_list, '|');
buffer_strcat(sources_list, value);
}
value = sep;
}
if(buffer_strlen(sources_list)) {
simple_pattern_free(rq->sources);
rq->sources = simple_pattern_create(buffer_tostring(sources_list), "|", SIMPLE_PATTERN_EXACT, false);
}
buffer_json_array_close(wb); // source
}
else if(strncmp(keyword, LQS_PARAMETER_AFTER ":", sizeof(LQS_PARAMETER_AFTER ":") - 1) == 0) {
rq->after_s = str2l(&keyword[sizeof(LQS_PARAMETER_AFTER ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_BEFORE ":", sizeof(LQS_PARAMETER_BEFORE ":") - 1) == 0) {
rq->before_s = str2l(&keyword[sizeof(LQS_PARAMETER_BEFORE ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(LQS_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) {
rq->if_modified_since = str2ull(&keyword[sizeof(LQS_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL);
}
else if(strncmp(keyword, LQS_PARAMETER_ANCHOR ":", sizeof(LQS_PARAMETER_ANCHOR ":") - 1) == 0) {
rq->anchor = str2ull(&keyword[sizeof(LQS_PARAMETER_ANCHOR ":") - 1], NULL);
}
else if(strncmp(keyword, LQS_PARAMETER_DIRECTION ":", sizeof(LQS_PARAMETER_DIRECTION ":") - 1) == 0) {
rq->direction = lgs_get_direction(&keyword[sizeof(LQS_PARAMETER_DIRECTION ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_LAST ":", sizeof(LQS_PARAMETER_LAST ":") - 1) == 0) {
rq->entries = str2ul(&keyword[sizeof(LQS_PARAMETER_LAST ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_QUERY ":", sizeof(LQS_PARAMETER_QUERY ":") - 1) == 0) {
freez((void *)rq->query);
rq->query= strdupz(&keyword[sizeof(LQS_PARAMETER_QUERY ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_HISTOGRAM ":", sizeof(LQS_PARAMETER_HISTOGRAM ":") - 1) == 0) {
freez((void *)rq->histogram);
rq->histogram = strdupz(&keyword[sizeof(LQS_PARAMETER_HISTOGRAM ":") - 1]);
}
else if(strncmp(keyword, LQS_PARAMETER_FACETS ":", sizeof(LQS_PARAMETER_FACETS ":") - 1) == 0) {
rq->default_facet = FACET_KEY_OPTION_NONE;
facets_reset_and_disable_all_facets(facets);
char *value = &keyword[sizeof(LQS_PARAMETER_FACETS ":") - 1];
if(*value) {
buffer_json_member_add_array(wb, LQS_PARAMETER_FACETS);
while(value) {
char *sep = strchr(value, ',');
if(sep)
*sep++ = '\0';
facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
buffer_json_add_array_item_string(wb, value);
value = sep;
}
buffer_json_array_close(wb); // facets
}
}
else {
char *value = strchr(keyword, ':');
if(value) {
*value++ = '\0';
buffer_json_member_add_array(wb, keyword);
while(value) {
char *sep = strchr(value, ',');
if(sep)
*sep++ = '\0';
facets_register_facet_filter_id(
facets, keyword, value,
FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_REORDER);
buffer_json_add_array_item_string(wb, value);
rq->filters++;
value = sep;
}
buffer_json_array_close(wb); // keyword
}
}
}
rq->fields_are_ids = true;
return true;
}
static inline void lqs_info_response(BUFFER *wb, FACETS *facets) {
// the buffer already has the request in it
// DO NOT FLUSH IT
buffer_json_member_add_uint64(wb, "v", 3);
facets_accepted_parameters_to_json_array(facets, wb, false);
buffer_json_member_add_array(wb, "required_params");
{
buffer_json_add_array_item_object(wb);
{
buffer_json_member_add_string(wb, "id", "source");
buffer_json_member_add_string(wb, "name", "source");
buffer_json_member_add_string(wb, "help", "Select the logs source to query");
buffer_json_member_add_string(wb, "type", "multiselect");
buffer_json_member_add_array(wb, "options");
{
LQS_FUNCTION_SOURCE_TO_JSON_ARRAY(wb);
}
buffer_json_array_close(wb); // options array
}
buffer_json_object_close(wb); // required params object
}
buffer_json_array_close(wb); // required_params array
facets_table_config(wb);
buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
buffer_json_member_add_string(wb, "type", "table");
buffer_json_member_add_string(wb, "help", LQS_FUNCTION_DESCRIPTION);
buffer_json_finalize(wb);
wb->content_type = CT_APPLICATION_JSON;
wb->response_code = HTTP_RESP_OK;
}
static inline BUFFER *lqs_create_output_buffer(void) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
return wb;
}
static inline FACETS *lqs_facets_create(uint32_t items_to_return, FACETS_OPTIONS options, const char *visible_keys, const char *facet_keys, const char *non_facet_keys, bool have_slice) {
FACETS *facets = facets_create(items_to_return, options,
visible_keys, facet_keys, non_facet_keys);
facets_accepted_param(facets, LQS_PARAMETER_INFO);
facets_accepted_param(facets, LQS_PARAMETER_SOURCE);
facets_accepted_param(facets, LQS_PARAMETER_AFTER);
facets_accepted_param(facets, LQS_PARAMETER_BEFORE);
facets_accepted_param(facets, LQS_PARAMETER_ANCHOR);
facets_accepted_param(facets, LQS_PARAMETER_DIRECTION);
facets_accepted_param(facets, LQS_PARAMETER_LAST);
facets_accepted_param(facets, LQS_PARAMETER_QUERY);
facets_accepted_param(facets, LQS_PARAMETER_FACETS);
facets_accepted_param(facets, LQS_PARAMETER_HISTOGRAM);
facets_accepted_param(facets, LQS_PARAMETER_IF_MODIFIED_SINCE);
facets_accepted_param(facets, LQS_PARAMETER_DATA_ONLY);
facets_accepted_param(facets, LQS_PARAMETER_DELTA);
facets_accepted_param(facets, LQS_PARAMETER_TAIL);
facets_accepted_param(facets, LQS_PARAMETER_SAMPLING);
if(have_slice)
facets_accepted_param(facets, LQS_PARAMETER_SLICE);
return facets;
}
static inline bool lqs_request_parse_and_validate(LOGS_QUERY_STATUS *lqs, BUFFER *wb, char *function, BUFFER *payload, bool have_slice, const char *default_histogram) {
LOGS_QUERY_REQUEST *rq = &lqs->rq;
FACETS *facets = lqs->facets;
if( (payload && !lqs_request_parse_POST(lqs, wb, payload, rq->transaction)) ||
(!payload && !lqs_request_parse_GET(lqs, wb, function)) )
return false;
// ----------------------------------------------------------------------------------------------------------------
// validate parameters
if(rq->query && !*rq->query) {
freez((void *)rq->query);
rq->query = NULL;
}
if(rq->histogram && !*rq->histogram) {
freez((void *)rq->histogram);
rq->histogram = NULL;
}
if(!rq->data_only)
rq->delta = false;
if(!rq->data_only || !rq->if_modified_since)
rq->tail = false;
rq->now_s = now_realtime_sec();
rq->expires_s = rq->now_s + 1;
wb->expires = rq->expires_s;
if(!rq->after_s && !rq->before_s) {
rq->before_s = rq->now_s;
rq->after_s = rq->before_s - LQS_DEFAULT_QUERY_DURATION;
}
else
rrdr_relative_window_to_absolute(&rq->after_s, &rq->before_s, rq->now_s);
if(rq->after_s > rq->before_s) {
time_t tmp = rq->after_s;
rq->after_s = rq->before_s;
rq->before_s = tmp;
}
if(rq->after_s == rq->before_s)
rq->after_s = rq->before_s - LQS_DEFAULT_QUERY_DURATION;
rq->after_ut = rq->after_s * USEC_PER_SEC;
rq->before_ut = (rq->before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
if(!rq->entries)
rq->entries = LQS_DEFAULT_ITEMS_PER_QUERY;
// ----------------------------------------------------------------------------------------------------------------
// validate the anchor
lqs->last_modified = 0;
lqs->anchor.start_ut = lqs->rq.anchor;
lqs->anchor.stop_ut = 0;
if(lqs->anchor.start_ut && lqs->rq.tail) {
// a tail request
// we need the top X entries from BEFORE
// but, we need to calculate the facets and the
// histogram up to the anchor
lqs->rq.direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
lqs->anchor.start_ut = 0;
lqs->anchor.stop_ut = lqs->rq.anchor;
}
if(lqs->rq.anchor && lqs->rq.anchor < lqs->rq.after_ut) {
lqs_log_error(lqs, "received anchor is too small for query timeframe, ignoring anchor");
lqs->rq.anchor = 0;
lqs->anchor.start_ut = 0;
lqs->anchor.stop_ut = 0;
lqs->rq.direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
}
else if(lqs->rq.anchor > lqs->rq.before_ut) {
lqs_log_error(lqs, "received anchor is too big for query timeframe, ignoring anchor");
lqs->rq.anchor = 0;
lqs->anchor.start_ut = 0;
lqs->anchor.stop_ut = 0;
lqs->rq.direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
}
facets_set_anchor(facets, lqs->anchor.start_ut, lqs->anchor.stop_ut, lqs->rq.direction);
facets_set_additional_options(facets,
((lqs->rq.data_only) ? FACETS_OPTION_DATA_ONLY : 0) |
((lqs->rq.delta) ? FACETS_OPTION_SHOW_DELTAS : 0));
facets_set_items(facets, lqs->rq.entries);
facets_set_query(facets, lqs->rq.query);
if(lqs->rq.slice && have_slice)
facets_enable_slice_mode(facets);
else
lqs->rq.slice = false;
if(lqs->rq.histogram) {
if(lqs->rq.fields_are_ids)
facets_set_timeframe_and_histogram_by_id(facets, lqs->rq.histogram, lqs->rq.after_ut, lqs->rq.before_ut);
else
facets_set_timeframe_and_histogram_by_name(facets, lqs->rq.histogram, lqs->rq.after_ut, lqs->rq.before_ut);
}
else if(default_histogram)
facets_set_timeframe_and_histogram_by_name(facets, default_histogram, lqs->rq.after_ut, lqs->rq.before_ut);
// complete the request object
buffer_json_member_add_boolean(wb, LQS_PARAMETER_INFO, lqs->rq.info);
buffer_json_member_add_boolean(wb, LQS_PARAMETER_SLICE, lqs->rq.slice);
buffer_json_member_add_boolean(wb, LQS_PARAMETER_DATA_ONLY, lqs->rq.data_only);
buffer_json_member_add_boolean(wb, LQS_PARAMETER_DELTA, lqs->rq.delta);
buffer_json_member_add_boolean(wb, LQS_PARAMETER_TAIL, lqs->rq.tail);
buffer_json_member_add_uint64(wb, LQS_PARAMETER_SAMPLING, lqs->rq.sampling);
buffer_json_member_add_uint64(wb, "source_type", lqs->rq.source_type);
buffer_json_member_add_uint64(wb, LQS_PARAMETER_AFTER, lqs->rq.after_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, LQS_PARAMETER_BEFORE, lqs->rq.before_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, "if_modified_since", lqs->rq.if_modified_since);
buffer_json_member_add_uint64(wb, LQS_PARAMETER_ANCHOR, lqs->rq.anchor);
buffer_json_member_add_string(wb, LQS_PARAMETER_DIRECTION, lqs->rq.direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
buffer_json_member_add_uint64(wb, LQS_PARAMETER_LAST, lqs->rq.entries);
buffer_json_member_add_string(wb, LQS_PARAMETER_QUERY, lqs->rq.query);
buffer_json_member_add_string(wb, LQS_PARAMETER_HISTOGRAM, lqs->rq.histogram);
buffer_json_object_close(wb); // request
return true;
}
static inline void lqs_cleanup(LOGS_QUERY_STATUS *lqs) {
freez((void *)lqs->rq.query);
freez((void *)lqs->rq.histogram);
simple_pattern_free(lqs->rq.sources);
facets_destroy(lqs->facets);
}
#endif //NETDATA_LOGS_QUERY_STATUS_H

View file

@ -133,9 +133,13 @@ static inline void pluginsd_function_json_error_to_stdout(const char *transactio
fflush(stdout);
}
static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
static inline void pluginsd_function_result_to_stdout(const char *transaction, BUFFER *result) {
pluginsd_function_result_begin_to_stdout(transaction, result->response_code,
content_type_id2string(result->content_type),
result->expires);
fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
pluginsd_function_result_end_to_stdout();
fflush(stdout);
}

View file

@ -10,14 +10,14 @@ static struct {
const char *options;
} content_types[] = {
// primary - preferred during id-to-string conversions
{ .format = "text/html", CT_TEXT_HTML, true },
{ .format = "application/json", CT_APPLICATION_JSON, true },
{ .format = "text/plain", CT_TEXT_PLAIN, true },
{ .format = "text/html", CT_TEXT_HTML, true },
{ .format = "text/css", CT_TEXT_CSS, true },
{ .format = "text/yaml", CT_TEXT_YAML, true },
{ .format = "application/yaml", CT_APPLICATION_YAML, true },
{ .format = "text/xml", CT_TEXT_XML, true },
{ .format = "text/xsl", CT_TEXT_XSL, true },
{ .format = "application/json", CT_APPLICATION_JSON, true },
{ .format = "application/xml", CT_APPLICATION_XML, true },
{ .format = "application/javascript", CT_APPLICATION_X_JAVASCRIPT, true },
{ .format = "application/octet-stream", CT_APPLICATION_OCTET_STREAM, false },

View file

@ -8,6 +8,9 @@ int rrd_call_function_error(BUFFER *wb, const char *msg, int code) {
buffer_json_member_add_int64(wb, "status", code);
buffer_json_member_add_string(wb, "error_message", msg);
buffer_json_finalize(wb);
wb->date = now_realtime_sec();
wb->expires = wb->date + 1;
wb->response_code = code;
return code;
}

View file

@ -7,6 +7,7 @@
#include <sys/syscall.h>
#endif
#include "timestamps.h"
#include "setproctitle.h"
#include "close_range.h"
#include "setresuid.h"

View file

@ -0,0 +1,4 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "libnetdata/libnetdata.h"
#include "timestamps.h"

View file

@ -0,0 +1,42 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef LIBNETDATA_OS_TIMESTAMPS_H
#define LIBNETDATA_OS_TIMESTAMPS_H
// Windows file time starts on January 1, 1601, Unix epoch starts on January 1, 1970
// Difference in 100-nanosecond intervals between these two dates is 116444736000000000ULL
// Convert Windows file time (in 100-nanosecond intervals) to Unix epoch in nanoseconds
#define os_windows_ulonglong_to_unix_epoch_ns(ft) (((uint64_t)(ft) - 116444736000000000ULL) * 100ULL)
// Convert Unix epoch time (in nanoseconds) to Windows file time (in 100-nanosecond intervals)
#define os_unix_epoch_ns_to_windows_ulonglong(ns) (((uint64_t)(ns) / 100ULL) + 116444736000000000ULL)
#if defined(OS_WINDOWS)
// Convert FILETIME to Unix epoch in nanoseconds
#define os_filetime_to_unix_epoch_ns(ft) \
((((uint64_t)(ft).dwHighDateTime << 32 | (ft).dwLowDateTime) - 116444736000000000ULL) * 100ULL)
// Convert Unix epoch in nanoseconds to FILETIME (returns FILETIME)
#define os_unix_epoch_ns_to_filetime(ns) \
({ \
uint64_t temp = ((uint64_t)(ns) / 100ULL) + 116444736000000000ULL; \
FILETIME ft; \
ft.dwLowDateTime = (uint32_t)(temp & 0xFFFFFFFF); \
ft.dwHighDateTime = (uint32_t)(temp >> 32); \
ft; \
})
// Convert Unix epoch in microseconds to FILETIME (returns FILETIME)
#define os_unix_epoch_ut_to_filetime(ns) \
({ \
uint64_t temp = ((uint64_t)(ns) * 10ULL) + 116444736000000000ULL; \
FILETIME ft; \
ft.dwLowDateTime = (uint32_t)(temp & 0xFFFFFFFF); \
ft.dwHighDateTime = (uint32_t)(temp >> 32); \
ft; \
})
#endif //OS_WINDOWS
#endif //LIBNETDATA_OS_TIMESTAMPS_H

View file

@ -166,7 +166,7 @@ bool duration_parse(const char *duration, int64_t *result, const char *default_u
// --------------------------------------------------------------------------------------------------------------------
// generate a string to represent a duration
ssize_t duration_snprintf(char *dst, size_t dst_size, int64_t value, const char *unit) {
ssize_t duration_snprintf(char *dst, size_t dst_size, int64_t value, const char *unit, bool add_spaces) {
if (!dst || dst_size == 0) return -1;
if (dst_size == 1) {
dst[0] = '\0';
@ -204,8 +204,9 @@ ssize_t duration_snprintf(char *dst, size_t dst_size, int64_t value, const char
int64_t unit_count = rounded / multiplier;
if (unit_count > 0) {
const char *space = (add_spaces && offset) ? " " : "";
int written = snprintfz(dst + offset, dst_size - offset,
"%s%" PRIi64 "%s", sign, unit_count, units[i].unit);
"%s%s%" PRIi64 "%s", space, sign, unit_count, units[i].unit);
if (written < 0)
return -3;

View file

@ -18,14 +18,14 @@ bool duration_parse(const char *duration, int64_t *result, const char *default_u
#define duration_parse_days(duration, days_ptr) duration_parse(duration, days_ptr, "d")
// duration (number to string)
ssize_t duration_snprintf(char *dst, size_t dst_size, int64_t value, const char *unit);
#define duration_snprintf_nsec_t(dst, dst_size, ns) duration_snprintf(dst, dst_size, ns, "ns")
#define duration_snprintf_usec_t(dst, dst_size, us) duration_snprintf(dst, dst_size, us, "us")
#define duration_snprintf_msec_t(dst, dst_size, ms) duration_snprintf(dst, dst_size, ms, "ms")
#define duration_snprintf_time_t(dst, dst_size, secs) duration_snprintf(dst, dst_size, secs, "s")
#define duration_snprintf_mins(dst, dst_size, mins) duration_snprintf(dst, dst_size, mins, "m")
#define duration_snprintf_hours(dst, dst_size, hours) duration_snprintf(dst, dst_size, hours, "h")
#define duration_snprintf_days(dst, dst_size, days) duration_snprintf(dst, dst_size, days, "d")
ssize_t duration_snprintf(char *dst, size_t dst_size, int64_t value, const char *unit, bool add_spaces);
#define duration_snprintf_nsec_t(dst, dst_size, ns) duration_snprintf(dst, dst_size, ns, "ns", false)
#define duration_snprintf_usec_t(dst, dst_size, us) duration_snprintf(dst, dst_size, us, "us", false)
#define duration_snprintf_msec_t(dst, dst_size, ms) duration_snprintf(dst, dst_size, ms, "ms", false)
#define duration_snprintf_time_t(dst, dst_size, secs) duration_snprintf(dst, dst_size, secs, "s", false)
#define duration_snprintf_mins(dst, dst_size, mins) duration_snprintf(dst, dst_size, mins, "m", false)
#define duration_snprintf_hours(dst, dst_size, hours) duration_snprintf(dst, dst_size, hours, "h", false)
#define duration_snprintf_days(dst, dst_size, days) duration_snprintf(dst, dst_size, days, "d", false)
bool duration_parse_seconds(const char *str, int *result);

View file

@ -148,7 +148,7 @@ bool size_parse(const char *size_str, uint64_t *result, const char *default_unit
// --------------------------------------------------------------------------------------------------------------------
// generate a string to represent a size
ssize_t size_snprintf(char *dst, size_t dst_size, uint64_t value, const char *unit) {
ssize_t size_snprintf(char *dst, size_t dst_size, uint64_t value, const char *unit, bool accurate) {
if (!dst || dst_size == 0) return -1;
if (dst_size == 1) {
dst[0] = '\0';
@ -178,9 +178,16 @@ ssize_t size_snprintf(char *dst, size_t dst_size, uint64_t value, const char *un
uint64_t reversed_bytes = (uint64_t)(converted * (double)su->multiplier);
if (reversed_bytes == bytes)
// no precision loss, this is good to use
su_best = su;
if(accurate) {
// no precision loss is required
if (reversed_bytes == bytes)
// no precision loss, this is good to use
su_best = su;
}
else {
if(converted > 1.0)
su_best = su;
}
}
double converted = size_round_to_resolution_dbl2(bytes, su_best->multiplier);

View file

@ -11,10 +11,10 @@ bool size_parse(const char *size_str, uint64_t *result, const char *default_unit
#define size_parse_mb(size_str, mb) size_parse(size_str, mb, "MiB")
#define size_parse_gb(size_str, gb) size_parse(size_str, gb, "GiB")
ssize_t size_snprintf(char *dst, size_t dst_size, uint64_t value, const char *unit);
#define size_snprintf_bytes(dst, dst_size, value) size_snprintf(dst, dst_size, value, "B")
#define size_snprintf_kb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "KiB")
#define size_snprintf_mb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "MiB")
#define size_snprintf_gb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "GiB")
ssize_t size_snprintf(char *dst, size_t dst_size, uint64_t value, const char *unit, bool accurate);
#define size_snprintf_bytes(dst, dst_size, value) size_snprintf(dst, dst_size, value, "B", true)
#define size_snprintf_kb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "KiB", true)
#define size_snprintf_mb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "MiB", true)
#define size_snprintf_gb(dst, dst_size, value) size_snprintf(dst, dst_size, value, "GiB", true)
#endif //LIBNETDATA_PARSERS_SIZE_H

View file

@ -520,13 +520,13 @@ static inline SIMPLE_HASHTABLE_VALUE_TYPE *simple_hashtable_set_named(SIMPLE_HAS
return SIMPLE_HASHTABLE_SLOT_DATA(sl);
}
static inline SIMPLE_HASHTABLE_VALUE_TYPE *simple_hashtable_get_named(SIMPLE_HASHTABLE_NAMED *ht, SIMPLE_HASHTABLE_KEY_TYPE *key, size_t key_len, SIMPLE_HASHTABLE_VALUE_TYPE *data) {
static inline SIMPLE_HASHTABLE_VALUE_TYPE *simple_hashtable_get_named(SIMPLE_HASHTABLE_NAMED *ht, SIMPLE_HASHTABLE_KEY_TYPE *key, size_t key_len) {
XXH64_hash_t hash = XXH3_64bits((void *)key, key_len);
SIMPLE_HASHTABLE_SLOT_NAMED *sl = simple_hashtable_get_slot_named(ht, hash, key, true);
return SIMPLE_HASHTABLE_SLOT_DATA(sl);
}
static inline bool simple_hashtable_del_named(SIMPLE_HASHTABLE_NAMED *ht, SIMPLE_HASHTABLE_KEY_TYPE *key, size_t key_len, SIMPLE_HASHTABLE_VALUE_TYPE *data) {
static inline bool simple_hashtable_del_named(SIMPLE_HASHTABLE_NAMED *ht, SIMPLE_HASHTABLE_KEY_TYPE *key, size_t key_len) {
XXH64_hash_t hash = XXH3_64bits((void *)key, key_len);
SIMPLE_HASHTABLE_SLOT_NAMED *sl = simple_hashtable_get_slot_named(ht, hash, key, true);
return simple_hashtable_del_slot_named(ht, sl);

View file

@ -219,6 +219,57 @@ int spawn_server_exec_kill(SPAWN_SERVER *server, SPAWN_INSTANCE *si) {
return spawn_server_exec_wait(server, si);
}
static int spawn_server_waitpid(SPAWN_INSTANCE *si) {
int status;
pid_t pid;
pid = waitpid(si->child_pid, &status, 0);
if(pid != si->child_pid) {
nd_log(NDLS_COLLECTORS, NDLP_ERR,
"SPAWN PARENT: failed to wait for pid %d: %s",
si->child_pid, si->cmdline);
return -1;
}
errno_clear();
if(WIFEXITED(status)) {
if(WEXITSTATUS(status))
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) exited with exit code %d: %s",
pid, si->request_id, WEXITSTATUS(status), si->cmdline);
}
else if(WIFSIGNALED(status)) {
if(WCOREDUMP(status))
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) coredump'd due to signal %d: %s",
pid, si->request_id, WTERMSIG(status), si->cmdline);
else
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) killed by signal %d: %s",
pid, si->request_id, WTERMSIG(status), si->cmdline);
}
else if(WIFSTOPPED(status)) {
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) stopped due to signal %d: %s",
pid, si->request_id, WSTOPSIG(status), si->cmdline);
}
else if(WIFCONTINUED(status)) {
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) continued due to signal %d: %s",
pid, si->request_id, SIGCONT, si->cmdline);
}
else {
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN SERVER: child with pid %d (request %zu) reports unhandled status: %s",
pid, si->request_id, si->cmdline);
}
return status;
}
int spawn_server_exec_wait(SPAWN_SERVER *server __maybe_unused, SPAWN_INSTANCE *si) {
if (!si) return -1;
@ -229,19 +280,8 @@ int spawn_server_exec_wait(SPAWN_SERVER *server __maybe_unused, SPAWN_INSTANCE *
// Wait for the process to exit
int status = __atomic_load_n(&si->waitpid_status, __ATOMIC_RELAXED);
bool exited = __atomic_load_n(&si->exited, __ATOMIC_RELAXED);
if(!exited) {
if(waitpid(si->child_pid, &status, 0) != si->child_pid) {
nd_log(NDLS_COLLECTORS, NDLP_ERR,
"SPAWN PARENT: failed to wait for pid %d: %s",
si->child_pid, si->cmdline);
status = -1;
}
else {
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN PARENT: child with pid %d exited with status %d (waitpid): %s",
si->child_pid, status, si->cmdline);
}
}
if(!exited)
status = spawn_server_waitpid(si);
else
nd_log(NDLS_COLLECTORS, NDLP_INFO,
"SPAWN PARENT: child with pid %d exited with status %d (sighandler): %s",

View file

@ -13,7 +13,7 @@ struct bearer_token_request {
STRING *client_name;
};
static bool parse_json_payload(json_object *jobj, const char *path, void *data, BUFFER *error) {
static bool bearer_parse_json_payload(json_object *jobj, const char *path, void *data, BUFFER *error) {
struct bearer_token_request *rq = data;
JSONC_PARSE_TXT2UUID_OR_ERROR_AND_RETURN(jobj, path, "claim_id", rq->claim_id, error, true);
JSONC_PARSE_TXT2UUID_OR_ERROR_AND_RETURN(jobj, path, "machine_guid", rq->machine_guid, error, true);
@ -32,7 +32,7 @@ int function_bearer_get_token(BUFFER *wb, const char *function __maybe_unused, B
int code;
struct bearer_token_request rq = { 0 };
CLEAN_JSON_OBJECT *jobj = json_parse_function_payload_or_error(wb, payload, &code, parse_json_payload, &rq);
CLEAN_JSON_OBJECT *jobj = json_parse_function_payload_or_error(wb, payload, &code, bearer_parse_json_payload, &rq);
if(!jobj || code != HTTP_RESP_OK) {
string_freez(rq.client_name);
return code;

View file

@ -1426,7 +1426,7 @@ void web_client_process_request_from_web_server(struct web_client *w) {
buffer_flush(w->url_as_received);
buffer_strcat(w->url_as_received, "too big request");
netdata_log_debug(D_WEB_CLIENT_ACCESS, "%llu: Received request is too big (%zu bytes).", w->id, w->response.data->len);
netdata_log_debug(D_WEB_CLIENT_ACCESS, "%llu: Received request is too big (%zu bytes).", w->id, (size_t)w->response.data->len);
size_t len = w->response.data->len;
buffer_flush(w->response.data);
@ -1506,14 +1506,18 @@ void web_client_process_request_from_web_server(struct web_client *w) {
break;
case HTTP_REQUEST_MODE_OPTIONS:
netdata_log_debug(D_WEB_CLIENT, "%llu: Done preparing the OPTIONS response. Sending data (%zu bytes) to client.", w->id, w->response.data->len);
netdata_log_debug(D_WEB_CLIENT,
"%llu: Done preparing the OPTIONS response. Sending data (%zu bytes) to client.",
w->id, (size_t)w->response.data->len);
break;
case HTTP_REQUEST_MODE_POST:
case HTTP_REQUEST_MODE_GET:
case HTTP_REQUEST_MODE_PUT:
case HTTP_REQUEST_MODE_DELETE:
netdata_log_debug(D_WEB_CLIENT, "%llu: Done preparing the response. Sending data (%zu bytes) to client.", w->id, w->response.data->len);
netdata_log_debug(D_WEB_CLIENT,
"%llu: Done preparing the response. Sending data (%zu bytes) to client.",
w->id, (size_t)w->response.data->len);
break;
case HTTP_REQUEST_MODE_FILECOPY:
@ -1620,8 +1624,9 @@ ssize_t web_client_send_deflate(struct web_client *w)
// when using compression,
// w->response.sent is the amount of bytes passed through compression
netdata_log_debug(D_DEFLATE, "%llu: web_client_send_deflate(): w->response.data->len = %zu, w->response.sent = %zu, w->response.zhave = %zu, w->response.zsent = %zu, w->response.zstream.avail_in = %u, w->response.zstream.avail_out = %u, w->response.zstream.total_in = %lu, w->response.zstream.total_out = %lu.",
w->id, w->response.data->len, w->response.sent, w->response.zhave, w->response.zsent, w->response.zstream.avail_in, w->response.zstream.avail_out, w->response.zstream.total_in, w->response.zstream.total_out);
netdata_log_debug(D_DEFLATE,
"%llu: web_client_send_deflate(): w->response.data->len = %zu, w->response.sent = %zu, w->response.zhave = %zu, w->response.zsent = %zu, w->response.zstream.avail_in = %u, w->response.zstream.avail_out = %u, w->response.zstream.total_in = %lu, w->response.zstream.total_out = %lu.",
w->id, (size_t)w->response.data->len, w->response.sent, w->response.zhave, w->response.zsent, w->response.zstream.avail_in, w->response.zstream.avail_out, w->response.zstream.total_in, w->response.zstream.total_out);
if(w->response.data->len - w->response.sent == 0 && w->response.zstream.avail_in == 0 && w->response.zhave == w->response.zsent && w->response.zstream.avail_out != 0) {
// there is nothing to send