0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 03:02:41 +00:00

journals management improvements ()

* different approach to distribute estimatons evenly

* log estimations calculation to spot the issue

* better log

* more logs

* more logs

* more logs

* more logs

* commented logs

* process journal files and directories in reverse order, so that the system will immediately be available after start

* put the right filename in the dictionary

* fix scan dir recursion

* add missing fields when sending logs to systemd-journal-remote

* the standard datetime format is now switched to rfc3339
This commit is contained in:
Costa Tsaousis 2023-11-26 02:22:47 +02:00 committed by GitHub
parent e08ce9a587
commit a59097ca50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 400 additions and 127 deletions

View file

@ -156,6 +156,8 @@ LIBNETDATA_FILES = \
libnetdata/completion/completion.h \
libnetdata/datetime/iso8601.c \
libnetdata/datetime/iso8601.h \
libnetdata/datetime/rfc3339.c \
libnetdata/datetime/rfc3339.h \
libnetdata/datetime/rfc7231.c \
libnetdata/datetime/rfc7231.h \
libnetdata/dictionary/dictionary.c \

View file

@ -13,14 +13,14 @@
#define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
#define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60
#define SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE 0.01
#define SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS 250
#define SYSTEMD_JOURNAL_ALL_FILES_SCAN_EVERY_USEC (5 * 60 * USEC_PER_SEC)
#define SYSTEMD_UNITS_FUNCTION_DESCRIPTION "View the status of systemd units"
#define SYSTEMD_UNITS_FUNCTION_NAME "systemd-list-units"
#define SYSTEMD_UNITS_DEFAULT_TIMEOUT 30
#define EXECUTE_WATCHER_PENDING_EVERY_MS 500
#define FULL_JOURNAL_SCAN_EVERY_USEC (5 * 60 * USEC_PER_SEC)
extern __thread size_t fstat_thread_calls;
extern __thread size_t fstat_thread_cached_responses;
void fstat_cache_enable_on_thread(void);
@ -28,7 +28,6 @@ void fstat_cache_disable_on_thread(void);
extern netdata_mutex_t stdout_mutex;
typedef enum {
ND_SD_JOURNAL_NO_FILE_MATCHED,
ND_SD_JOURNAL_FAILED_TO_OPEN,
@ -97,7 +96,8 @@ int journal_file_dict_items_forward_compar(const void *a, const void *b);
void buffer_json_journal_versions(BUFFER *wb);
void available_journal_file_sources_to_json_array(BUFFER *wb);
bool journal_files_completed_once(void);
void journal_files_updater_all_headers_sorted(void);
void journal_files_registry_update(void);
void journal_directory_scan_recursively(DICTIONARY *files, DICTIONARY *dirs, const char *dirname, int depth);
FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets, FACET_ROW *row, void *data);
@ -116,14 +116,12 @@ usec_t journal_file_update_annotation_boot_id(sd_journal *j, struct journal_file
#define MAX_JOURNAL_DIRECTORIES 100
struct journal_directory {
char *path;
bool logged_failure;
};
extern struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES];
void journal_init_files_and_directories(void);
void journal_init_query_status(void);
void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled);
void journal_files_registry_update(void);
void journal_file_update_header(const char *filename, struct journal_file *jf);
void netdata_systemd_journal_message_ids_init(void);

View file

@ -427,8 +427,8 @@ void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BU
ut = *p_ut;
if(ut && ut != UINT64_MAX) {
char buffer[ISO8601_MAX_LENGTH];
iso8601_datetime_ut(buffer, sizeof(buffer), ut, ISO8601_UTC);
char buffer[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(buffer, sizeof(buffer), ut, 0, true);
switch(scope) {
default:
@ -506,8 +506,8 @@ void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unu
if(*v && isdigit(*v)) {
uint64_t ut = str2ull(buffer_tostring(wb), NULL);
if(ut) {
char buffer[ISO8601_MAX_LENGTH];
iso8601_datetime_ut(buffer, sizeof(buffer), ut, ISO8601_UTC | ISO8601_MICROSECONDS);
char buffer[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(buffer, sizeof(buffer), ut, 6, true);
buffer_sprintf(wb, " (%s)", buffer);
}
}

View file

@ -602,17 +602,16 @@ static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, v
string_freez(jf->source);
}
void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_monotonic_ut) {
void journal_directory_scan_recursively(DICTIONARY *files, DICTIONARY *dirs, const char *dirname, int depth) {
static const char *ext = ".journal";
static const size_t ext_len = sizeof(".journal") - 1;
static const ssize_t ext_len = sizeof(".journal") - 1;
if (depth > VAR_LOG_JOURNAL_MAX_DEPTH)
return;
DIR *dir;
struct dirent *entry;
struct stat info;
char absolute_path[FILENAME_MAX];
char full_path[FILENAME_MAX];
// Open the directory.
if ((dir = opendir(dirname)) == NULL) {
@ -621,33 +620,43 @@ void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_mon
return;
}
bool existing = false;
bool *found = dictionary_set(dirs, dirname, &existing, sizeof(existing));
if(*found) return;
*found = true;
// Read each entry in the directory.
while ((entry = readdir(dir)) != NULL) {
snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name);
if (stat(absolute_path, &info) != 0) {
netdata_log_error("Failed to stat() '%s", absolute_path);
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
ssize_t len = snprintfz(full_path, sizeof(full_path), "%s/%s", dirname, entry->d_name);
if (entry->d_type == DT_DIR) {
journal_directory_scan_recursively(files, dirs, full_path, depth++);
}
else if (entry->d_type == DT_REG && len > ext_len && strcmp(full_path + len - ext_len, ext) == 0) {
if(files)
dictionary_set(files, full_path, NULL, 0);
if (S_ISDIR(info.st_mode)) {
// If entry is a directory, call traverse recursively.
if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
journal_directory_scan(absolute_path, depth + 1, last_scan_monotonic_ut);
send_newline_and_flush();
}
else if (S_ISREG(info.st_mode)) {
// If entry is a regular file, check if it ends with .journal.
char *filename = entry->d_name;
size_t len = strlen(filename);
else if (entry->d_type == DT_LNK) {
struct stat info;
if (stat(full_path, &info) == -1)
continue;
if (S_ISDIR(info.st_mode)) {
// The symbolic link points to a directory
char resolved_path[FILENAME_MAX + 1];
if (realpath(full_path, resolved_path) != NULL) {
journal_directory_scan_recursively(files, dirs, resolved_path, depth++);
}
}
else if(S_ISREG(info.st_mode) && len > ext_len && strcmp(full_path + len - ext_len, ext) == 0) {
if(files)
dictionary_set(files, full_path, NULL, 0);
if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) {
struct journal_file t = {
.file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC,
.last_scan_monotonic_ut = last_scan_monotonic_ut,
.size = info.st_size,
.max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
};
dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t));
send_newline_and_flush();
}
}
@ -661,41 +670,38 @@ bool journal_files_completed_once(void) {
return journal_files_scans > 0;
}
static int journal_file_dict_items_last_modified_compar(const void *a, const void *b) {
const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b;
struct journal_file *jfa = dictionary_acquired_item_value(*ad);
struct journal_file *jfb = dictionary_acquired_item_value(*bd);
int filenames_compar(const void *a, const void *b) {
const char *p1 = *(const char **)a;
const char *p2 = *(const char **)b;
if(jfa->file_last_modified_ut > jfb->file_last_modified_ut)
const char *at1 = strchr(p1, '@');
const char *at2 = strchr(p2, '@');
if(!at1 && at2)
return -1;
else if(jfa->file_last_modified_ut < jfb->file_last_modified_ut)
if(at1 && !at2)
return 1;
return 0;
}
if(!at1 && !at2)
return strcmp(p1, p2);
void journal_files_updater_all_headers_sorted(void) {
const DICTIONARY_ITEM *file_items[dictionary_entries(journal_files_registry)];
size_t files_used = 0;
const char *dash1 = strrchr(at1, '-');
const char *dash2 = strrchr(at2, '-');
struct journal_file *jf;
dfe_start_write(journal_files_registry, jf){
if(jf->last_scan_header_vs_last_modified_ut < jf->file_last_modified_ut)
file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item);
}
dfe_done(jf);
if(!dash1 || !dash2)
return strcmp(p1, p2);
// sort them in reverse order (newer first)
qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
journal_file_dict_items_last_modified_compar);
uint64_t ts1 = strtoul(dash1 + 1, NULL, 16);
uint64_t ts2 = strtoul(dash2 + 1, NULL, 16);
// update the header (first and last message ut, sequence numbers, etc)
for(size_t i = 0; i < files_used ; i++) {
jf = dictionary_acquired_item_value(file_items[i]);
journal_file_update_header(jf->filename, jf);
dictionary_acquired_item_release(journal_files_registry, file_items[i]);
send_newline_and_flush();
}
if(ts1 > ts2)
return -1;
if(ts1 < ts2)
return 1;
return -strcmp(p1, p2);
}
void journal_files_registry_update(void) {
@ -704,13 +710,46 @@ void journal_files_registry_update(void) {
if(spinlock_trylock(&spinlock)) {
usec_t scan_monotonic_ut = now_monotonic_usec();
for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) {
if(!journal_directories[i].path)
break;
DICTIONARY *files = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
DICTIONARY *dirs = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_DONT_OVERWRITE_VALUE);
journal_directory_scan(journal_directories[i].path, 0, scan_monotonic_ut);
for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) {
if(!journal_directories[i].path) break;
journal_directory_scan_recursively(files, dirs, journal_directories[i].path, 0);
}
const char **array = mallocz(sizeof(const char *) * dictionary_entries(files));
size_t used = 0;
void *x;
dfe_start_read(files, x) {
if(used >= dictionary_entries(files)) continue;
array[used++] = x_dfe.name;
}
dfe_done(x);
qsort(array, used, sizeof(const char *), filenames_compar);
for(size_t i = 0; i < used ;i++) {
const char *full_path = array[i];
struct stat info;
if (stat(full_path, &info) == -1)
continue;
struct journal_file t = {
.file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC,
.last_scan_monotonic_ut = scan_monotonic_ut,
.size = info.st_size,
.max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
};
struct journal_file *jf = dictionary_set(journal_files_registry, full_path, &t, sizeof(t));
journal_file_update_header(jf->filename, jf);
}
freez(array);
dictionary_destroy(files);
dictionary_destroy(dirs);
struct journal_file *jf;
dfe_start_write(journal_files_registry, jf){
if(jf->last_scan_monotonic_ut < scan_monotonic_ut)
@ -718,8 +757,6 @@ void journal_files_registry_update(void) {
}
dfe_done(jf);
journal_files_updater_all_headers_sorted();
journal_files_scans++;
spinlock_unlock(&spinlock);

View file

@ -136,29 +136,31 @@ static char* get_path_from_wd(Watcher *watcher, int wd) {
return NULL;
}
static void watch_directory_recursively(Watcher *watcher, int inotifyFd, const char *basePath) {
// First, add a watch for the top-level directory itself
add_watch(watcher, inotifyFd, basePath);
char path[PATH_MAX];
struct dirent *dp;
DIR *dir = opendir(basePath);
if (!dir)
return;
while ((dp = readdir(dir)) != NULL) {
if (strcmp(dp->d_name, ".") != 0 && strcmp(dp->d_name, "..") != 0) {
snprintfz(path, sizeof(path), "%s/%s", basePath, dp->d_name);
if (dp->d_type == DT_DIR) {
// Recursively watch this directory
watch_directory_recursively(watcher, inotifyFd, path);
}
static bool is_directory_watched(Watcher *watcher, const char *path) {
for (int i = 0; i < watcher->watchCount; ++i) {
if (watcher->watchList[i].wd != -1 && strcmp(watcher->watchList[i].path, path) == 0) {
return true;
}
}
return false;
}
closedir(dir);
static void watch_directory_and_subdirectories(Watcher *watcher, int inotifyFd, const char *basePath) {
DICTIONARY *dirs = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
journal_directory_scan_recursively(NULL, dirs, basePath, 0);
void *x;
dfe_start_read(dirs, x) {
const char *dirname = x_dfe.name;
// Check if this directory is already being watched
if (!is_directory_watched(watcher, dirname)) {
add_watch(watcher, inotifyFd, dirname);
}
}
dfe_done(x);
dictionary_destroy(dirs);
}
static bool is_subpath(const char *path, const char *subpath) {
@ -236,7 +238,7 @@ void process_event(Watcher *watcher, int inotifyFd, struct inotify_event *event)
fullPath);
// Start watching the new directory - recursive watch
watch_directory_recursively(watcher, inotifyFd, fullPath);
watch_directory_and_subdirectories(watcher, inotifyFd, fullPath);
}
else
nd_log(NDLS_COLLECTORS, NDLP_WARNING,
@ -310,14 +312,14 @@ void *journal_watcher_main(void *arg __maybe_unused) {
for (unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) {
if (!journal_directories[i].path) break;
watch_directory_recursively(&watcher, inotifyFd, journal_directories[i].path);
watch_directory_and_subdirectories(&watcher, inotifyFd, journal_directories[i].path);
}
usec_t last_headers_update_ut = now_monotonic_usec();
struct buffered_reader reader;
while (1) {
buffered_reader_ret_t rc = buffered_reader_read_timeout(
&reader, inotifyFd, EXECUTE_WATCHER_PENDING_EVERY_MS, false);
&reader, inotifyFd, SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS, false);
if (rc != BUFFERED_READER_READ_OK && rc != BUFFERED_READER_READ_POLL_TIMEOUT) {
nd_log(NDLS_COLLECTORS, NDLP_CRIT,
@ -353,7 +355,7 @@ void *journal_watcher_main(void *arg __maybe_unused) {
usec_t ut = now_monotonic_usec();
if (dictionary_entries(watcher.pending) && (rc == BUFFERED_READER_READ_POLL_TIMEOUT ||
last_headers_update_ut + (EXECUTE_WATCHER_PENDING_EVERY_MS * USEC_PER_MS) <= ut)) {
last_headers_update_ut + (SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS * USEC_PER_MS) <= ut)) {
process_pending(&watcher);
last_headers_update_ut = ut;
}

View file

@ -474,9 +474,9 @@ static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCT
size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs);
// Calculate the proportion of time covered
usec_t total_time_ut;
usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
usec_t remaining_time_ut = sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut,
NULL, NULL);
&remaining_start_ut, &remaining_end_ut);
if (total_time_ut == 0) total_time_ut = 1;
double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut;
@ -494,10 +494,30 @@ static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCT
size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines;
if (remaining_logs_by_time < 1) remaining_logs_by_time = 1;
// nd_log(NDLS_COLLECTORS, NDLP_INFO,
// "JOURNAL ESTIMATION: '%s' "
// "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
// "first message read from the file at %"PRIu64", current message at %"PRIu64", "
// "proportion of time %.2f %%, "
// "expected total lines in file %zu, "
// "remaining lines %zu, "
// "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]"
// , jf->filename
// , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
// , fqs->query_file.first_msg_ut, msg_ut
// , proportion_by_time * 100.0
// , expected_matching_logs_by_time
// , remaining_logs_by_time
// , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut
// );
return remaining_logs_by_time;
}
static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
size_t expected_matching_logs_by_seqnum = 0;
double proportion_by_seqnum = 0.0;
@ -613,7 +633,7 @@ static inline sampling_t is_row_in_sample(sd_journal *j, FUNCTION_QUERY_STATUS *
if(fqs->samples_per_file.unsampled > fqs->samples_per_file.sampled) {
double progress_by_time = sampling_running_file_query_progress_by_time(fqs, jf, direction, msg_ut);
if(progress_by_time > 0.05)
if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE)
return SAMPLING_STOP_AND_ESTIMATE;
}
@ -1166,6 +1186,18 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs);
// nd_log(NDLS_COLLECTORS, NDLP_INFO,
// "JOURNAL ESTIMATION FINAL: '%s' "
// "total lines %zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
// , jf->filename
// , fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled + fqs->samples_per_file.estimated
// , fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
// );
rows_useful = fqs->rows_useful - rows_useful;
rows_read = fqs->rows_read - rows_read;
bytes_read = fqs->bytes_read - bytes_read;

View file

@ -86,14 +86,14 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
usec_t step_ut = 100 * USEC_PER_MS;
usec_t send_newline_ut = 0;
usec_t since_last_scan_ut = FULL_JOURNAL_SCAN_EVERY_USEC * 2; // something big to trigger scanning at start
usec_t since_last_scan_ut = SYSTEMD_JOURNAL_ALL_FILES_SCAN_EVERY_USEC * 2; // something big to trigger scanning at start
bool tty = isatty(fileno(stderr)) == 1;
heartbeat_t hb;
heartbeat_init(&hb);
while(!plugin_should_exit) {
if(since_last_scan_ut > FULL_JOURNAL_SCAN_EVERY_USEC) {
if(since_last_scan_ut > SYSTEMD_JOURNAL_ALL_FILES_SCAN_EVERY_USEC) {
journal_files_registry_update();
since_last_scan_ut = 0;
}

View file

@ -0,0 +1,135 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "../libnetdata.h"
#include "rfc3339.h"
size_t rfc3339_datetime_ut(char *buffer, size_t len, usec_t now_ut, size_t fractional_digits, bool utc) {
if (!buffer || len == 0)
return 0;
time_t t = (time_t)(now_ut / USEC_PER_SEC);
struct tm *tmp, tmbuf;
if (utc)
tmp = gmtime_r(&t, &tmbuf);
else
tmp = localtime_r(&t, &tmbuf);
if (!tmp) {
buffer[0] = '\0';
return 0;
}
size_t used_length = strftime(buffer, len, "%Y-%m-%dT%H:%M:%S", tmp);
if (used_length == 0) {
buffer[0] = '\0';
return 0;
}
if (fractional_digits >= 0 && fractional_digits <= 9) {
int fractional_part = (int)(now_ut % USEC_PER_SEC);
if (fractional_part && len - used_length > fractional_digits + 1) {
char format[] = ".%01d";
format[3] = (char)('0' + fractional_digits);
// Adjust fractional part
fractional_part /= (int)pow(10, 6 - fractional_digits);
used_length += snprintf(buffer + used_length, len - used_length,
format, fractional_part);
}
}
if (utc) {
if (used_length + 1 < len) {
buffer[used_length++] = 'Z';
buffer[used_length] = '\0';
}
}
else {
long offset = tmbuf.tm_gmtoff;
int hours = (int)(offset / 3600);
int minutes = abs((int)((offset % 3600) / 60));
if (used_length + 7 < len) { // Space for "+HH:MM\0"
used_length += snprintf(buffer + used_length, len - used_length, "%+03d:%02d", hours, minutes);
}
}
return used_length;
}
usec_t rfc3339_parse_ut(const char *rfc3339, char **endptr) {
struct tm tm = { 0 };
int tz_hours = 0, tz_mins = 0;
char *s;
usec_t timestamp, usec = 0;
// Use strptime to parse up to seconds
s = strptime(rfc3339, "%Y-%m-%dT%H:%M:%S", &tm);
if (!s)
return 0; // Parsing error
// Parse fractional seconds if present
if (*s == '.') {
char *next;
usec = strtoul(s + 1, &next, 10);
int digits_parsed = (int)(next - (s + 1));
if (digits_parsed < 1 || digits_parsed > 9)
return 0; // parsing error
static const usec_t fix_usec[] = {
1000000, // 0 digits (not used)
100000, // 1 digit
10000, // 2 digits
1000, // 3 digits
100, // 4 digits
10, // 5 digits
1, // 6 digits
10, // 7 digits
100, // 8 digits
1000, // 9 digits
};
usec = digits_parsed <= 6 ? usec * fix_usec[digits_parsed] : usec / fix_usec[digits_parsed];
s = next;
}
// Check and parse timezone if present
int tz_offset = 0;
if (*s == '+' || *s == '-') {
// Parse the hours:mins part of the timezone
if (!isdigit(s[1]) || !isdigit(s[2]) || s[3] != ':' ||
!isdigit(s[4]) || !isdigit(s[5]))
return 0; // Parsing error
char tz_sign = *s;
tz_hours = (s[1] - '0') * 10 + (s[2] - '0');
tz_mins = (s[4] - '0') * 10 + (s[5] - '0');
tz_offset = tz_hours * 3600 + tz_mins * 60;
tz_offset *= (tz_sign == '+' ? 1 : -1);
s += 6; // Move past the timezone part
}
else if (*s == 'Z')
s++;
else
return 0; // Invalid RFC 3339 format
// Convert to time_t (assuming local time, then adjusting for timezone later)
time_t epoch_s = mktime(&tm);
if (epoch_s == -1)
return 0; // Error in time conversion
timestamp = (usec_t)epoch_s * USEC_PER_SEC + usec;
timestamp -= tz_offset * USEC_PER_SEC;
if(endptr)
*endptr = s;
return timestamp;
}

View file

@ -0,0 +1,12 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "../libnetdata.h"
#ifndef NETDATA_RFC3339_H
#define NETDATA_RFC3339_H
#define RFC3339_MAX_LENGTH 36
size_t rfc3339_datetime_ut(char *buffer, size_t len, usec_t now_ut, size_t fractional_digits, bool utc);
usec_t rfc3339_parse_ut(const char *rfc3339, char **endptr);
#endif //NETDATA_RFC3339_H

View file

@ -931,7 +931,23 @@ static inline void facets_histogram_update_value(FACETS *facets, usec_t usec) {
facets_histogram_update_value_slot(facets, usec, v);
}
static usec_t overlap_duration_ut(usec_t start1, usec_t end1, usec_t start2, usec_t end2) {
usec_t overlap_start = MAX(start1, start2);
usec_t overlap_end = MIN(end1, end2);
if (overlap_start < overlap_end)
return overlap_end - overlap_start;
else
return 0; // No overlap
}
void facets_update_estimations(FACETS *facets, usec_t from_ut, usec_t to_ut, size_t entries) {
if(unlikely(!facets->histogram.enabled))
return;
if(unlikely(!overlap_duration_ut(facets->histogram.after_ut, facets->histogram.before_ut, from_ut, to_ut)))
return;
facets->operations.rows.evaluated += entries;
facets->operations.rows.matched += entries;
facets->operations.rows.estimated += entries;
@ -952,29 +968,28 @@ void facets_update_estimations(FACETS *facets, usec_t from_ut, usec_t to_ut, siz
FACET_VALUE *v = facets->histogram.key->estimated_value.v;
size_t from_slot = facets_histogram_slot_at_time_ut(facets, from_ut, v);
size_t to_slot = facets_histogram_slot_at_time_ut(facets, to_ut, v);
size_t slots = 0;
size_t total_ut = to_ut - from_ut;
ssize_t remaining_entries = (ssize_t)entries;
for (size_t slot = from_slot; slot <= to_slot; slot++) {
if (unlikely(slot >= facets->histogram.slots))
break;
size_t slot = facets_histogram_slot_at_time_ut(facets, from_ut, v);
for(; slot < facets->histogram.slots ;slot++) {
usec_t slot_start_ut = facets->histogram.after_ut + slot * facets->histogram.slot_width_ut;
usec_t slot_end_ut = slot_start_ut + facets->histogram.slot_width_ut;
usec_t overlap_start_ut = (from_ut > slot_start_ut) ? from_ut : slot_start_ut;
usec_t overlap_end_ut = (to_ut < slot_end_ut) ? to_ut : slot_end_ut;
usec_t overlap_ut = (overlap_end_ut > overlap_start_ut) ? (overlap_end_ut - overlap_start_ut) : 0;
if(slot_start_ut > to_ut)
break;
usec_t overlap_ut = overlap_duration_ut(from_ut, to_ut, slot_start_ut, slot_end_ut);
size_t slot_entries = (overlap_ut * entries) / total_ut;
v->histogram[slot] += slot_entries;
remaining_entries -= (ssize_t)slot_entries;
slots++;
}
// Check if all entries are assigned
// This should always be true if the distribution is correct
internal_fatal(remaining_entries < 0 || remaining_entries > (ssize_t)(to_slot - from_slot),
internal_fatal(remaining_entries < 0 || remaining_entries >= (ssize_t)(slots),
"distribution of estimations is not accurate - there are %zd remaining entries",
remaining_entries);
}

View file

@ -723,6 +723,7 @@ extern char *netdata_configured_host_prefix;
#include "line_splitter/line_splitter.h"
#include "clocks/clocks.h"
#include "datetime/iso8601.h"
#include "datetime/rfc3339.h"
#include "datetime/rfc7231.h"
#include "completion/completion.h"
#include "popen/popen.h"

View file

@ -1498,8 +1498,8 @@ static void timestamp_usec_annotator(BUFFER *wb, const char *key, struct log_fie
if(!ut)
return;
char datetime[ISO8601_MAX_LENGTH];
iso8601_datetime_ut(datetime, sizeof(datetime), ut, ISO8601_LOCAL_TIMEZONE | ISO8601_MILLISECONDS);
char datetime[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(datetime, sizeof(datetime), ut, 3, false);
if(buffer_strlen(wb))
buffer_fast_strcat(wb, " ", 1);

View file

@ -135,7 +135,12 @@ static inline void buffer_memcat_replacing_newlines(BUFFER *wb, const char *src,
char global_hostname[HOST_NAME_MAX] = "";
char global_boot_id[UUID_COMPACT_STR_LEN] = "";
char global_machine_id[UUID_COMPACT_STR_LEN] = "";
char global_stream_id[UUID_COMPACT_STR_LEN] = "";
char global_namespace[1024] = "";
char global_systemd_invocation_id[1024] = "";
#define BOOT_ID_PATH "/proc/sys/kernel/random/boot_id"
#define MACHINE_ID_PATH "/etc/machine-id"
#define DEFAULT_PRIVATE_KEY "/etc/ssl/private/journal-upload.pem"
#define DEFAULT_PUBLIC_KEY "/etc/ssl/certs/journal-upload.pem"
@ -197,16 +202,25 @@ static void journal_remote_complete_event(BUFFER *msg, usec_t *monotonic_ut) {
*monotonic_ut = ut;
buffer_sprintf(msg,
""
"__REALTIME_TIMESTAMP=%llu\n"
"__MONOTONIC_TIMESTAMP=%llu\n"
"_BOOT_ID=%s\n"
"_HOSTNAME=%s\n"
"\n"
""
"__REALTIME_TIMESTAMP=%llu\n"
"__MONOTONIC_TIMESTAMP=%llu\n"
"_MACHINE_ID=%s\n"
"_BOOT_ID=%s\n"
"_HOSTNAME=%s\n"
"_TRANSPORT=stdout\n"
"_LINE_BREAK=nul\n"
"_STREAM_ID=%s\n"
"_RUNTIME_SCOPE=system\n"
"%s%s\n"
, now_realtime_usec()
, ut
, global_machine_id
, global_boot_id
, global_hostname
, global_stream_id
, global_namespace
, global_systemd_invocation_id
);
}
@ -245,10 +259,10 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,
timeout_ms = 10;
global_boot_id[0] = '\0';
char boot_id[1024];
if(read_file(BOOT_ID_PATH, boot_id, sizeof(boot_id)) == 0) {
char buffer[1024];
if(read_file(BOOT_ID_PATH, buffer, sizeof(buffer)) == 0) {
uuid_t uuid;
if(uuid_parse_flexi(boot_id, uuid) == 0)
if(uuid_parse_flexi(buffer, uuid) == 0)
uuid_unparse_lower_compact(uuid, global_boot_id);
else
fprintf(stderr, "WARNING: cannot parse the UUID found in '%s'.\n", BOOT_ID_PATH);
@ -261,6 +275,27 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,
uuid_unparse_lower_compact(uuid, global_boot_id);
}
if(read_file(MACHINE_ID_PATH, buffer, sizeof(buffer)) == 0) {
uuid_t uuid;
if(uuid_parse_flexi(buffer, uuid) == 0)
uuid_unparse_lower_compact(uuid, global_machine_id);
else
fprintf(stderr, "WARNING: cannot parse the UUID found in '%s'.\n", MACHINE_ID_PATH);
}
if(global_machine_id[0] == '\0') {
fprintf(stderr, "WARNING: cannot read '%s'. Will generate a random _MACHINE_ID.\n", MACHINE_ID_PATH);
uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower_compact(uuid, global_boot_id);
}
if(global_stream_id[0] == '\0') {
uuid_t uuid;
uuid_generate_random(uuid);
uuid_unparse_lower_compact(uuid, global_stream_id);
}
if(global_hostname[0] == '\0') {
if(gethostname(global_hostname, sizeof(global_hostname)) != 0) {
fprintf(stderr, "WARNING: cannot get system's hostname. Will use internal default.\n");
@ -268,6 +303,9 @@ static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url,
}
}
if(global_systemd_invocation_id[0] == '\0' && getenv("INVOCATION_ID"))
snprintfz(global_systemd_invocation_id, sizeof(global_systemd_invocation_id), "_SYSTEMD_INVOCATION_ID=%s\n", getenv("INVOCATION_ID"));
if(!key)
key = DEFAULT_PRIVATE_KEY;
@ -483,6 +521,9 @@ static int help(void) {
" The default is: " DEFAULT_CA_CERT "\n"
" The keyword 'all' can be used to trust all CAs.\n"
"\n"
" --namespace=NAMESPACE\n"
" Set the namespace of the messages sent.\n"
"\n"
" --keep-trying\n"
" Keep trying to send the message, if the remote journal is not there.\n"
#endif
@ -752,11 +793,6 @@ int main(int argc, char *argv[]) {
return 1;
}
if(url && namespace) {
fprintf(stderr, "Cannot log to a systemd-journal-remote URL using a namespace. "
"Please either give --url or --namespace, not both.\n");
return 1;
}
#endif
if(log_as_netdata && namespace) {
@ -770,6 +806,9 @@ int main(int argc, char *argv[]) {
#ifdef HAVE_CURL
if(url) {
if(url && namespace && *namespace)
snprintfz(global_namespace, sizeof(global_namespace), "_NAMESPACE=%s\n", namespace);
log_to_journal_remote_ret_t rc;
do {
rc = log_input_to_journal_remote(url, key, cert, trust, newline, timeout_ms);

View file

@ -557,8 +557,8 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
};
ND_LOG_STACK_PUSH(lgs);
char buf[ISO8601_MAX_LENGTH];
iso8601_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0);
char buf[RFC3339_MAX_LENGTH];
rfc3339_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0, false);
nd_log(NDLS_DAEMON, priority,
"STREAM %s [send to %s]: %s - will retry in %d secs, at %s",