0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 03:02:41 +00:00
netdata_netdata/logsmanagement/flb_plugin.c
Dimitris P 4e512411ec
Logs Management ()
This PR adds the logs-management external plugin. 
See the included README for an extensive list of features. 
-------------------------------------------------------------------------------------

* Add proper status return in JSON response of functions

* Add column info to functions

* Escape special characters when returning JSON response

* Add proper functions help and defaults. Fix help not working

* Add 'logs_management_meta' object in functions results

* Fix compiler warnings

* Replace tabs with 3 spaces in web_client_api_request_v1_logsmanagement_sources()

* Add 'sources' in functions to display list of log sources

* Update functions column values for logs

* Update chart titles and remove '/s' from units

* Add support for compound queries in circular buffers

* Refactor circ_buff_search() to get rid of circ_buff_search_compound()

* Fix incorrect docker events nano timestamp padding

* Fixed botched rebasing

* Replace get_unix_time_ms() with now_realtime_msec()

* Remove binary generation from Fluent-Bit lib build

* Fix compiler warnings due to new timestamp type

* Remove STDIN and STDOUT support from Fluent-Bit library

* Initial support for FLB_KMSG kernel logs collection

* Add kernel logs charts

* Add kernel logs subsystem and device charts

* Skip collection of pre-existing logs in kmsg ring buffer

* Add example of custom kmsg charts

* Add extra initialization error logs

* Fix bug of Docker Events collector failure disabling whole logs management engine

* Remove reduntant FLB output code

* Remove some obsolete TODO comments

* Remove some commented out error/debug prints

* Disable some Fluent-Bit config options not required

* Make circular buffer spare items option configurable

* Add DB mode configuration option

* Replace p_file_infos_arr->data[i] with p_file_info in db_api.c

* Remove db_loop due to all function calls being synchronous

* Add initial README.md

* Add DB mode = none changes

* Add a simple webpage to visualize log query results

* Add support for source selection to logs_query.html

* Add option to query multiple log sources

* Mark non-queryable sources as such in logs_query.html

* Add option to use either GET or functions request in logs_query.html

* Install logs_query.html when running stress tests

* Update README.md requirements

* Change installer behavior to build logs management by default

* Disable logs management at runtime by default

* Add global db mode configuration in 'logs management' config section

* Split logsmanagement.conf into required & optional sections

* Remove --enable-logsmanagement from stress test script

* Add global config option for 'circular buffer max size MiB'

* Add global config option for 'circular buffer drop logs if full'

* Update 'General Configuration' in README.md

* Add global config option for remaining optional settings

* Add systemd collector requirements to TOC

* README: Convert general configuration to table

* README: Fix previous botched commit

* Enable logs management by default when building for stress testing

* Move logging to collector.log from error.log

* Fix contenttype compilation errors

* Move logging to collector.log in plugin_logsmanagement.c

* Rename 'rows' to 'records' in charts

* Add Netdata error.log parsing

* Add more dashboard descriptions

* Sanitize chart ids

* Attempt to fix failing CI

* Update README.md

* Update README.md

* Another attempt to fix CI failures

* Fix undefined reference to 'uv_sleep' on certain platforms

* Support FLB forward input and FLB output plugins.

Squashed commit of the following:

commit 55e2bf4fb34a2e02ffd0b280790197310a5299f3
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Apr 13 16:41:09 2023 +0300

    Remove error.log from stock config

commit bbdc62c2c9727359bc3c8ef8c33ee734d0039be7
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Apr 13 16:37:48 2023 +0300

    Add cleanup of Fluent Bit outputs in p_file_info_destroy()

commit 09b0aa4268ec1ccef160c99c5d5f31b6388edd28
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Apr 13 14:34:17 2023 +0300

    Some code and config cleanup

commit 030d074667d5ee2cad10f85cd836ca90e29346ad
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Apr 13 13:04:08 2023 +0300

    Enable additional Fluent Bit output plugins for shared library

commit 490aa5d44caa38042521d24c6b886b8b4a59a73c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Apr 13 01:33:19 2023 +0300

    Add initialization of Fluent Bit user-configured outputs

commit c96e9fe9cea96549aa5eae09d0deeb130da02793
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Apr 4 23:13:16 2023 +0100

    Complete read of parameters for FLB outputs config

commit 00988897f9b86d1ecc5c141b19df7ad7d74f7e96
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Apr 3 19:43:31 2023 +0100

    Update README.md

commit 6deea5399c2707942aeaa51408f999ca45dfd351
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Apr 3 16:02:28 2023 +0100

    Refactor Syslog_parser_config_t and add Flb_socket_config_t

commit 7bf998a4c298bbd489ef735c56a6e85a137772c9
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Apr 3 14:19:57 2023 +0100

    Update README.md

commit c353d194b12c54f134936072ebaded0424d73cc0
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Mar 31 14:52:57 2023 +0100

    Update README.md

commit 6be726eaff3738ba7884de799aa52949833af65a
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Mar 31 13:06:29 2023 +0100

    Update README. Fix docker_events streaming

commit 6aabfb0f1ef0529a7a0ecbaf940bc0952bf42518
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Mar 30 21:27:45 2023 +0100

    Fix stuck in infinite loop bug for FLB_GENERIC, FLB_WEB_LOG and FLB_SERIAL remote log sources

commit eea6346b708cc7a5ce6e2249366870f4924eabae
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Mar 30 21:04:12 2023 +0100

    Remove callback that searches for streamed p_file_info match

commit bc9c5a523b0b0ab5588adbff391a43ba8d9a0cdf
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Mar 30 15:51:39 2023 +0100

    Basic streaming works

commit 4c80f59f0214bc07895f0b2edca47cb02bc06420
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Mar 28 22:05:22 2023 +0100

    WIP

commit eeb37a71b602fb0738fe8077ccddc0a8ce632304
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Mar 27 22:52:09 2023 +0100

    Add generic forward streaming input

commit 1459b91847c80c4d97de96b75b00771039458ad6
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Mar 23 18:50:14 2023 +0000

    FLB_FORWARD: WIP

* Add number of logs per item in DB and in queries response

* Fix wrong number of lines stored in DB for web logs

* Refactor number of logs parsers and charts code

* Add option to toggle number of collected logs metrics and charts

* Disable kmsg log collector by default

* Fix logs_query.html to work with any server ip

* Fix regressed wrong number of web log lines bug

* Change query quota type from size_t to long long

* Update alpine version when searching for fts-dev requirements

* Update query results to return both requested and actual quota

* Fix bug of circ buffs not being read if head == read but not empty

* Squashed commit of the following:

commit 34edb316a737f3edcffcf8fa88a3801599011495
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu May 4 20:02:36 2023 +0100

    Comment out some debug prints

commit 51b9b87a88516186530f5b4b65f785b543fefe8c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Apr 28 19:21:54 2023 +0100

    Fix wrong filenames in BLOBS_TABLE after rotation

commit 6055fc2893b48661af324f20ee61511a40abbc02
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Apr 28 12:22:04 2023 +0100

    Add chart showing number of circular buffer items

commit 0bb5210b0847f4b7596f633ec96fc10aa8ebc791
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Apr 25 16:47:29 2023 +0300

    Various fixes.

    Fix num_lines calculation.
    Add debug prints for circ buffers.
    Remove circ buff spare items option.
    Fix calculation of circ buff memory consumption.
    Add buff_realloc_rwlock for db_mode = none case.
    Fix circ buff read to be done correctly when buff is full.

commit f494af8c95be84404c7d854494d26da3bcbd3ad7
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Apr 21 16:03:50 2023 +0300

    Fix freez() on non-malloced address

commit cce6d09e9cf9b847aface7309643e2c0a6041390
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Apr 21 15:41:25 2023 +0300

    Add option to dynamically expand circ buffs when full

* Use log timestamps when possible, instead of collection timestamps.
Also, add config options for Fluent Bit engine and remove tail_plugin.

Squashed commit of the following:

commit b16a02eb6e3a90565c90e0a274b87b123e7b18e5
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue May 16 19:38:57 2023 +0100

    Add Fluent Bit service config options to netdata.conf. Add monitoring of new log file fluentbit.log

commit ab77c286294548ea62a3879ac0f8b8bbfe6a0687
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 21:25:17 2023 +0100

    Remove some debug prints

commit 46d64ad2434e69b1d20720297aec1ddb869e1f84
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 21:19:32 2023 +0100

    Fix null values in charts

commit 8ec96821d6a882f28cbd19244ebdfc86c807d2f4
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 17:43:04 2023 +0100

    Update README.md to reflect log timestamp changes

commit 079a91858cf9db2f74711581235bc17eb97c7dad
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 16:23:14 2023 +0100

    Add configurable option for 'update timeout'

commit 72b5e2505d4657fcbb5ccb6eeee00c45eb0b51ff
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 16:05:08 2023 +0100

    Revert logsmanagement.conf to logs-manag-master one

commit 70d0ea6f8d272fff318aa3095d90a78dcc3411a7
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 16:02:00 2023 +0100

    Fix bug of circ buff items not marked as done

commit 5716420838771edb7842be4669bf96235b15cf71
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 15 16:01:41 2023 +0100

    Fix do_custom_charts_update() to work for all log sources

commit a8def8f53fd25c3efa56ef27e267df3261913a8e
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 12 18:20:20 2023 +0100

    Remove GENERIC and WEB_LOG cases. Remove tail_plugin.c/h. Remove generic_parser().

commit 1cf05966e33491dbeb9b877f18d1ea8643aabeba
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 12 16:54:59 2023 +0100

    Fix FLB_GENERIC and FLB_SERIAL to work with new timestamp logic

commit df3266810531f1af5f99b666fbf44c503b304a39
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 12 14:55:04 2023 +0100

    Get rid of *_collect() functions and restructure plugin_logsmanagement workers

commit 3eee069842f3257fffe60dacfc274363bc43491c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 12 14:28:33 2023 +0100

    Fix wrong order of #define _XOPEN_SOURCE 700 in parser.c

commit 941aa80cb55d5a7d6fe8926da930d9803be52312
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu May 11 22:27:39 2023 +0100

    Update plugin_logsmanagement_web_log to use new timestamp logic and to support delayed logs. Refactor req_method metrics code.

commit 427a7d0e2366d43cb5eab7daa1ed82dfc3bc8bc8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue May 9 20:26:08 2023 +0100

    Update plugin_logsmanagement_kernel to use new timestamp logic and to support delayed charts

commit a7e95a6d3e5c8b62531b671fd3ec7b8a3196b5bb
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue May 9 15:22:14 2023 +0100

    Update plugin_logsmanagement_systemd to use new timestamp logic and support delayed charts

commit 48237ac2ce49c82abdf2783952fd9f0ef05d72e1
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue May 9 13:29:44 2023 +0100

    Refactor number of collected logs chart update code

commit a933c8fcae61c23fa0ec6d0074526ac5d243cf16
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 8 22:11:19 2023 +0100

    Update plugin_logsmanagement_docker_ev to use new timestamp logic and support delayed charts

commit 5d8db057155affd5cb721399a639d75a81801b7f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 5 15:18:06 2023 +0100

    Change some Fluent Bit collectors to use log timestamps instead of collection timestamps

* Remove some unused defines and typedefs

* Improve flb_init()

* Update file-level doxygen. Add SPDX license declaration.

* Better handling of termination of Fluent Bit

* Better handling of DB errors. Various fixes.

Squashed commit of the following:

commit f55feea1274c3857eda1e9d899743db6e3eb5bf5
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Jun 6 13:28:00 2023 +0100

    Fix web log parsing in case of lines terminated by \r

commit 9e05758a4ecfac57a0db14757cff9536deda51d8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Jun 5 20:42:05 2023 +0100

    Fix warnings due to -Wformat-truncation=2

commit 63477666fa42446d74693aae542580d4e1e81f03
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Jun 5 16:48:45 2023 +0100

    Autodiscovery of Netdata error.log based on netdata_configured_log_dir

commit cab5e6d6061f4259172bbf72666e8b4a3a35dd66
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Jun 5 16:24:39 2023 +0100

    Replace Forward config default string literals with macros

commit 4213398031dbb53afbc943d76bf7df202d12bf6f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Jun 5 15:56:29 2023 +0100

    Proper cleanup of flb_lib_out_cb *callback in case of error

commit f76fd7cc7bc2d0241e4d3517f61ae192d4246300
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Jun 5 15:36:07 2023 +0100

    Proper termination of Forward input and respective log sources in case of error

commit 3739fd96c29e13298eb3a6e943a63172cdf39d5f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Jun 1 21:19:56 2023 +0100

    Merge db_search() and db_search_compound()

commit fcface90cb0a6df3c3a2de5e1908b1b3467dd579
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Jun 1 19:17:26 2023 +0100

    Proper error handling in db_search() and db_search_compound(). Refactor the code too.

commit c10667ebee2510a1af77114b3a7e18a0054b5dae
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Jun 1 14:23:34 2023 +0100

    Update DB mode and dir when switching to db_mode_none

commit d37d4c3d79333bb9fa430650c13ad625458620e8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Jun 1 12:56:13 2023 +0100

    Fix flb_stop() SIGSEGV

commit 892e231c68775ff1a1f052d292d26384f1ef54b1
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue May 30 21:14:58 2023 +0100

    Switch to db_writer_db_mode_none if db_writer_db_mode_full encounters error

commit f7a0c2135ff61d3a5b0460ec5964eb6bce164bd6
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 29 21:41:21 2023 +0100

    Complete error handling changes to db_init(). Add some const type qualifiers. Refactor some code for readability

commit 13dbeac936d22958394cb1aaec394384f5a93fdd
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon May 29 17:14:17 2023 +0100

    More error handling changes in db_init(). Change some global default settings if stress testing.

commit eb0691c269cd09054190bf0ee9c4e9247b4a2548
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri May 26 23:29:12 2023 +0100

    Better handling of db writer threads errors. Add db timings charts

* Fix mystrsep() replaced by strsep_skip_consecutive_separators()

* Fix older GCC failure due to label before declaration

* Fix failed builds when using libuv <= v1.19

* Fix some Codacy warnings

* Fix warning: implicit declaration of function ‘strsep’

* Use USEC_PER_SEC instead of 1000000ULL

* Use UUID_STR_LEN instead of GUID_LEN + 1

* Combine multiple 'ln -sf' Docker instructions to one

* Update README with systemd development libraries requirement

* Comment out mallocz() success checkes in parser_csv()

* Fix shellcheck warnings

* Remove asserts for empty SYSLOG_IDENTIFIER or PID

* Fix FreeBSD failing builds

* Fix some more shellcheck warnings

* Update Alpine fts-dev required packages

* First changes to use web log timestamp for correct metrics timings

* Initial work to add test_parse_web_log_line() unit test

* Complete test_parse_web_log_line() tests

* Improve parse_web_log_line() for better handling of \n, \r, double quotes etc.

* Fix 'Invalid TIME error when timezone sign is negative

* Add more logs to compression unit test case

* Misc read_last_line() improvements

* Fix failing test_auto_detect_web_log_parser_config() when test case terminated without '\n'

* Remove unused preprocessor macro

* Factor out setup of parse_config_expected_num_fields

* Add test for count_fields()

* Add unit test for read_last_line()

* Fix a read_last_line() bug

* Remove PLUGIN[logsmanagement] static thread and update charts synchronously, right before data buffering

* Fix web log parser potential SIGSEGV

* Fix web log metrics bug where they could show delayed by 1 collection interval

* WIP: Add multiline support to kmsg logs and fix metric timings

* Fix kmsg subsystem and device parsing and metrics

* Add option 'use log timestamp' to select between log timestamps or collection timestamps

* Add 'Getting Started' docs section

* Move logs management functions code to separate source files

* Add 'Nginx access.log' chart description

* Remove logsmanagement.plugin source files

* Fix some memory leaks

* Improve cleanup of logsmanagement_main()

* Fix a potential memory leak of fwd_input_out_cb

* Better termination and cleanup of main_loop and its handles

* Fix main_db_dir access() check bug

* Avoid uv_walk() SIGSEGV

* Remove main_db_dir access() check

* Better termination and cleanup of DB code

* Remove flb_socket_config_destroy() that could cause a segmentation fault

* Disable unique client IPs - all-time chart by default

* Update README.md

* Fix debug() -> netdata_log_debug()

* Fix read_last_line()

* Fix timestamp sign adjustment and wrong unit tests

* Change WEB_CLIENT_ACL_DASHBOARD to WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC

* Do not parse web log timestamps if 'use_log_timestamp = no'

* Add Logs Management back into buildinfo.c

* Update README.md

* Do not build Fluent Bit executable binary

* Change logs rate chart to RRDSET_TYPE_LINE

* Add kludge to prevent metrics breaking due to out of order logs

* Fix wrong flb_tmp_buff_cpy_timer expiration

* Refactor initialization of input plugin for local log sources.

* Rename FLB_GENERIC collector to FLB_TAIL.

* Switch 'Netdata fluentbit.log' to disabled by default

* Add 'use inotify' configuration option

* Update  in README.md

* Add docker event actions metrics

* Update README.md to include event action chart

* Remove commented out PLUGIN[logsmanagement] code block

* Fix some warnings

* Add documentation for outgoing log streaming and exporting

* Fix some code block formatting in README.md

* Refactor code related to error status of log query results and add new invalid timestamp case

* Reduce query mem allocs and fix end timestamp == 0 bug

* Add support for duplicate timestamps in db_search()

* Add support for duplicate timestamps in circ_buff_search()

* Fix docker events contexts

* Various query fixes prior to reverse order search.

- Add reverse qsort() function in circ buffers.
- Fix issues to properly support of duplicate timestamps.
- Separate requested from actual timestamps in query parameters.
- Rename results buffer variable name to be consistent between DB and
  buffers.
- Remove default start and end timestamp from functions.
- Improve handling of invalid quotas provided by users.
- Rename 'until' timestamp name to 'to'.
- Increase default quota to 10MB from 1MB.
- Allow start timestamp to be > than end timestamp.

* Complete descending timestamp search for circular buffers

* Complete descending timestamp search for DB

* Remove MEASURE_QUERY_TIME code block

* Complete descending timestamp search when data resides in both DB and circular buffers

* Use pointer instead of copying res_hdr in query results

* Refactor web log timezone parsing to use static memory allocation

* Add stats for CPU user & system time per MiB of query results

* Micro-optimization to slightly speed up queries

* More micro-optimizations and some code cleanup

* Remove LOGS_QUERY_DATA_FORMAT_NEW_LINE option

* Escape iscntrl() chars at collection rather at query

* Reduce number of buffer_strcat() calls

* Complete descending timestamp order queries for web_api_v1

* Complete descending timestamp order queries for functions

* Fix functions query timings to match web_api_v1 ones

* Add MQTT message collector

Squashed commit of the following:

commit dbe515372ee04880b1841ef7800abe9385b12e1c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Aug 21 15:18:46 2023 +0100

    Update README.md with MQTT information

commit c0b5dbcb7cdef8c6fbd5e72e7bdd08957a0fd3de
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Aug 21 14:59:36 2023 +0100

    Tidy up before merge

commit 9a69c4f17eac858532918a8f850a770b12710f80
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Aug 21 12:54:33 2023 +0100

    Fix issue with duplicate Log_Source_Path in DB, introduced in commit e417af3

commit 48213e9713216d62fca8a5bc1bbc41a3883fdc14
Author: Dim-P <dimitris1703@gmail.com>
Date:   Sat Aug 19 05:05:36 2023 +0100

    WIP

commit e417af3b947f11bd61e3255306bc95953863998d
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Aug 17 18:03:39 2023 +0100

    Update functions logsmanagement help output

* Inhibit Fluent Bit build warnings

* Fix missing allow_subpaths value in api_commands_v1[].

* Fix missing HTTP_RESP_BACKEND_FETCH_FAILED error

* Fix an enum print warning

* Remove systemd-devel requirement from README and fix codacy warnings

* Update Alpine versions for musl-fts-dev

* Update Fluent Bit to v2.1.8

Squashed commit of the following:

commit faf6fc4b7919cc2611124acc67cb1973ce705530
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Aug 25 17:13:30 2023 +0100

    Fix wrong default CORE_STACK_SIZE on Alpine

commit a810238fe7830ce626f6d57245d68035b29723f7
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Aug 25 00:40:02 2023 +0100

    Update Fluent Bit patches for musl

commit 8bed3b611dba94a053e22c2b4aa1d46f7787d9b4
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Aug 24 21:54:38 2023 +0100

    Fix an edge case crash when web log method is '-'

commit b29b48ea230363142697f9749508cd926e18ee19
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Aug 24 16:26:13 2023 +0100

    Disable FLB_OUT_CALYPTIA to fix Alpine dlsym() error

commit eabe0d0523ffe98ff881675c21b0763a49c05f16
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Aug 22 21:25:54 2023 +0100

    Add 'use inotify = no' troubleshooting Q&A in README

commit 7f7ae85bdb0def63b4fc05ab88f6572db948e0e7
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Aug 22 18:06:36 2023 +0100

    Update README.md links to latest version

commit 610c5ac7b920d4a1dfe364ad48f1ca14a0acc346
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Aug 22 16:23:30 2023 +0100

    Update flb_parser_create() definition

commit f99608ff524b6f3462264e626a1073f9c2fdfdf5
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Aug 22 16:23:04 2023 +0100

    Add new config.cmake options

commit 446b0d564626055a0a125f525d0bd3754184b830
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Aug 22 12:21:25 2023 +0100

    Update Fluent Bit submodule to v2.1.8

* Add logs_management_unittest() to CI 'unittest'

* Remove obsolete query testing files

* Patch Fluent Bit log format to match netdata's format

* Update README with instructions on how to monitor Podman events logs

* Fix core dump in case of flb_lib_path dlopen()

* Fix some potential compiler warnings

* Fix queries crash if logs manag engine not running

* Much faster termination of LOGS MANAGEMENT

* Add facets support and other minor fixes.

logsmanagement_function_execute_cb() is replaced by
logsmanagement_function_facets() which adds facets support to logs
management queries.

Internal query results header now includes additional fields
(log_source, log_type, basename, filename, chartname), that are used as facets.

Queries now support timeout as a query parameter.

A web log timestamp bug is fixed (by using timegm() instead of mktime().

web_api_v1 logsmanagement API is only available in debugging now.

Squashed commit of the following:

commit 32cf0381283029d793ec3af30d96e6cd77ee9149
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 19 16:21:32 2023 +0300

    Tidy up

commit f956b5846451c6b955a150b5d071947037e935f0
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 19 13:30:54 2023 +0300

    Add more accepted params. Add data_only option. Add if_modified_since option.

commit 588c2425c60dcdd14349b7b346467dba32fda4e9
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Sep 18 18:39:50 2023 +0300

    Add timeout to queries

commit da0f055fc47a36d9af4b7cc4cefb8eb6630e36d9
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 19:17:16 2023 +0300

    Fix histogram

commit 7149890974e0d26420ec1c5cfe1023801dc973fa
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 17:58:52 2023 +0300

    Add keyword query using simple patterns and fix descending timestamp values

commit 0bd068c5a76e694b876027e9fa5af6f333ab825b
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 13:54:05 2023 +0300

    Add basename, filename, chartname as facets

commit 023c2b5f758b2479a0e48da575cd59500a1373b6
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 13:26:06 2023 +0300

    Add info and sources functions options

commit ab4d555b7d445f7291af474847bd9177d3726a76
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 12:54:37 2023 +0300

    Fix facet id filter

commit a69c9e2732f5a6da1764bb57d1c06d8d65979225
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 14 12:07:13 2023 +0300

    WIP: Add facet id filters

commit 3c02b5de81fa8a20c712863c347539a52936ddd8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 12 18:19:17 2023 +0300

    Add log source and log type to circ buff query results header

commit 8ca98672c4911c126e50f3cbdd69ac363abdb33d
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 12 18:18:13 2023 +0300

    Fix logsmanagement facet function after master rebasing

commit 3f1517ad56cda2473a279a8d130bec869fc2cbb8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 12 18:14:25 2023 +0300

    Restrict /logsmanagement to ACL_DEV_OPEN_ACCESS only

commit 8ca98d69b08d006c682997268d5d2523ddde6be0
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 12 14:40:22 2023 +0300

    Fix incorrectly parsed timestamps due to DST

commit f9b0848037b29c7fcc46da951ca5cd9eb129066f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Sep 11 13:42:18 2023 +0300

    Add logs_management_meta object to facet query results

commit babc978f6c97107aaf8b337d8d31735d61761b6a
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Sep 11 13:03:52 2023 +0300

    Query all sources if no arguments provided

commit 486d56de87af56aae6c0dc5d165341418222ce8b
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 7 18:38:04 2023 +0300

    Add log_source and log_type (only for DB logs) as facets. Add relative time support

commit b564c12843d355c4da6436af358d5f352cb58bfe
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 7 13:47:20 2023 +0300

    Working facet with descending timestamps

commit 68c6a5c64e8425cf28ec16adfb0c50289caa82a9
Author: Dim-P <dimitris1703@gmail.com>
Date:   Wed Sep 6 01:55:51 2023 +0300

    WIP

* Fix linking errors

* Convert logs management to external plugin.

Squashed commit of the following:

commit 16da6ba70ebde0859aed734087f04af497ce3a77
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 18:44:12 2023 +0100

    Use higher value of update every from netdata.conf or logsmanagement.d.conf

commit 88cc3497c403e07686e9fc0876ebb0c610a1404c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 18:43:02 2023 +0100

    Tidy up

commit c3fca57aac169842637d210269519612b1a91e28
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 18:02:04 2023 +0100

    Use external update_every from agent, if available

commit f7470708ba82495b03297cdf8962a09b16617ddd
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 17:40:46 2023 +0100

    Re-enable debug logs

commit b34f5ac6a2228361ab41df7d7e5e713f724368c0
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 15:49:20 2023 +0100

    Remove old API calls from web_api_v1.c/h

commit 7fbc1e699a7785ec837233b9562199ee6c7684da
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 15:32:04 2023 +0100

    Add proper termination of stats charts thread

commit 4c0fc05c8b14593bd7a0aa68f75a8a1205e04db4
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 24 15:31:36 2023 +0100

    Add tests for logsmanag_config functions

commit 4dfdacb55707ab46ed6c2d5ce538ac012574b27e
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 22:01:19 2023 +0100

    Remove unused headers from logsmanagement.c

commit b324ef396207c5c32e40ea9ad462bf374470b230
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 21:56:26 2023 +0100

    Remove inline from get_X_dir() functions

commit e9656e8121b66cd7ef8b5daaa5d27a134427aa35
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 21:50:32 2023 +0100

    Proper termination when a signal is received

commit b09eec147bdeffae7b268b6335f6ba89f084e050
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 20:12:13 2023 +0100

    Refactor logs management config code in separate source files

commit 014b46a5008fd296f7d25854079c518d018abdec
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 14:54:47 2023 +0100

    Fix p_file_info_destroy() crash

commit e0bdfd182513bb8d5d4b4b5b8a4cc248ccf2d64e
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 23 14:18:27 2023 +0100

    Code refactoring and cleanup

commit 6a61cb6e2fd3a535db150b01d9450f44b3e27b30
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Oct 20 14:08:43 2023 +0100

    Fix 'source:all' queries

commit 45b516aaf819ac142353e323209b7d01e487393f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 19 21:51:05 2023 +0100

    Working 'source:...' queries and regular data queries (but not 'source:all')

commit 8064b0ee71c63da9803f79424802f860e96326e5
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 19 15:34:23 2023 +0100

    Fix issue due to p_file_info_destroy()

commit a0aacc9cd00cea60218c9bfd2b9f164918a1e3de
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 17 22:06:34 2023 +0100

    Work on facet API changes

commit 480584ff9040c07e996b14efb4d21970a347633f
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 16 21:43:06 2023 +0100

    Add stats charts, running as separate thread

commit 34d582dbe4bf2d8d048afab41681e337705bc611
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 16 16:24:02 2023 +0100

    Add SSL cipher charts

commit ced27ee4e2c981d291f498244f2eef2556a074fb
Author: Dim-P <dimitris1703@gmail.com>
Date:   Sun Oct 15 21:33:29 2023 +0100

    Add Response code family, Response code, Response code type, SSL protocol charts

commit 40c4a1d91892d49b1e4e18a1c3c43258ded4014d
Author: Dim-P <dimitris1703@gmail.com>
Date:   Sat Oct 14 00:48:48 2023 +0100

    Add more web log charts

commit 890ed3ff97153dd18d15df2d1b57a181bc498ca8
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Oct 13 22:14:11 2023 +0100

    Add web log vhosts and ports charts

commit 84733b6b1d353aff70687603019443610a8500c3
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 12 21:40:16 2023 +0100

    Add systemd charts

commit 14673501e8f48560956f53d5b670bbe801b8f2ae
Author: Dim-P <dimitris1703@gmail.com>
Date:   Wed Oct 11 00:28:43 2023 +0100

    Add MQTT charts

commit 366eb63b0a27dde6f0f8ba65120f34c18c1b21fd
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 21:46:19 2023 +0100

    Complete kmsg changes. Reduce mem usage. Fix a dictionary key size bug

commit 3d0216365a526ffbc9ce13a20c45447bfccb47d9
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 19:18:41 2023 +0100

    Add kmsg Subsystem charts

commit e61af4bb130a5cf5a5a78133f1e44b2b4c457b24
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 16:21:29 2023 +0100

    Fix bug of wrong kmsg timestamps in case of use_log_timestamp == 0

commit 03d22e0b26bddf249aab431a4f977bbd5cde98ca
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 16:20:47 2023 +0100

    Add kmsg charts, except for Subsystem and Device

commit f60b0787537a21ed3c4cea5101fcddc50f3bc55a
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 13:12:13 2023 +0100

    Initialise all docker events chart dimensions at startup

commit 5d873d3439abaf3768530cb5b72c6b4ef6565353
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 10 00:53:35 2023 +0100

    WIP: Add Docker events logs

commit 2cc3d6d98f58fc3ab67a8da3014210b14d0926a1
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 9 18:52:27 2023 +0100

    Use macros for num_of_logs_charts and custom_charts functions

commit fbd48ad3c9af674601238990d74192427475f2e3
Author: Dim-P <dimitris1703@gmail.com>
Date:   Mon Oct 9 18:26:17 2023 +0100

    Refactor custom charts code for clarity and speed

commit a31d80b5dc91161c0d74b10d00bc4fd1e6da7965
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 5 23:58:27 2023 +0100

    Add first working iteration of custom charts

commit b1e4ab8a460f4b4c3e2804e2f775787d21fbee45
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 5 23:57:27 2023 +0100

    Add more custom charts for Netdata error.log

commit f1b7605e564da3e297942f073593cdd4c21f88e1
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 5 20:39:40 2023 +0100

    Convert collected_logs_* chart updates to macros

commit 1459bc2b8bcd5ba21e024b10a8a5101048938f71
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Oct 5 19:11:54 2023 +0100

    Use rrdset_timed_done() instead of duration_since_last_update for correct chart timings

commit 876854c6ee7586a3eb9fdbf795bcc17a5fd1e6ad
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 3 21:53:14 2023 +0100

    Fix some bugs in chart updates

commit ae87508485499984bcb9b72bbc7d249c4168b380
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Oct 3 21:32:55 2023 +0100

    Functioning generic_chart_init() and generic_chart_update()

commit 982a9c4108dbea9571c785b5ff8a9d1e5472066c
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 28 23:32:52 2023 +0100

    Add support for multiple .conf files. Add stock examples.

commit 8e8abd0731227eb3fb3c6bcd811349575160799e
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 28 17:38:30 2023 +0100

    Add support for logsmanagement.d/default.conf

commit 1bf0732217b1d9e9959e1507ea96fc2c92ffb2ff
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 28 14:31:03 2023 +0100

    Add capabilities. Fix paths in logsmanagement.d.conf

commit a849d5b405bb4e5d770726fe99413a4efa7df274
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 26 23:06:31 2023 +0100

    Change logs_manag_config_load()

commit b0d1783b996286cd87e0832bfb74c29a845d61fc
Author: Dim-P <dimitris1703@gmail.com>
Date:   Tue Sep 26 15:35:30 2023 +0100

    Working unit tests and argument parsing

commit 6da1b4267a4d58d3a7cbcca9507afe8158a2e324
Author: Dim-P <dimitris1703@gmail.com>
Date:   Fri Sep 22 00:32:47 2023 +0300

    Build logs-management.plugin successfully

commit 9e30efe0422e4941f99cc66998d9f42e00a24676
Author: Dim-P <dimitris1703@gmail.com>
Date:   Thu Sep 21 16:13:21 2023 +0300

    Fix print format specifier in web_client_api_request_v1_logsmanagement()

* Modify CODEOWNERS

* Update README.md

Fix indentation

* Change snprintfz() to stncpyz() in circ_buff_search(). Change remaining 'chart_name' to 'chartname'.

* Replace SQLite version function with macro

* Fix some codacy warnings

* Update README.md

* Update Fluent Bit to v2.1.10

* Remove some comments

* Fix Fluent Bit shared library linking for armv7l and FreeBSD

* Remove compression source files

* Add prefix to rrd_api.h functions

* Add more unit tests

* Fix kmsg capabilities

* Separate kmsg and systemd default paths

* Fix some memory leaks and better termination of DB

* Add iterative queries if quota is exceeded

* Fix centos7 builds

* Fix issue where SYSTEMD timestamps are not parsed

* Fix logs management packaging.

* Fix typo in DEB control file.

* Fix indentation and missing new line at EOF

* Clean up functions and update help

* Fix 400 error when no queryable sources are available

* Fix if_modified_since. Add FACET_MAX_VALUE_LENGTH

* Add delta parameter and use anchor points in queries

* Fix CodeQL  warning

* Fix packaging issues.

* Fix postinstall script for DEB packages.

* Improve plugin shutdown speed

* Fix docker events chart grouping

* Fix functions evloop threads not terminating upon shutdown

* Fix coverity issues

* Fix logging

* Replace 'Netdata error.log' with 'Netdata daemon.log' in 'default.conf'

* Remove 'enabled = yes/no' config in logsmanagement.d.conf

* Remove 'enabled = X' unused config from logsmanagement.d.conf

---------

Co-authored-by: Austin S. Hemmelgarn <austin@netdata.cloud>
2023-11-27 16:55:14 +00:00

1435 lines
No EOL
65 KiB
C

// SPDX-License-Identifier: GPL-3.0-or-later
/** @file flb_plugin.c
* @brief This file includes all functions that act as an API to
* the Fluent Bit library.
*/
#include "flb_plugin.h"
#include <lz4.h>
#include "helper.h"
#include "defaults.h"
#include "circular_buffer.h"
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
#include "../fluent-bit/lib/msgpack-c/include/msgpack/unpack.h"
#include "../fluent-bit/lib/msgpack-c/include/msgpack/object.h"
#include "../fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h"
#include <dlfcn.h>
#define LOG_REC_KEY "msg" /**< key to represent log message field in most log sources **/
#define LOG_REC_KEY_SYSTEMD "MESSAGE" /**< key to represent log message field in systemd log source **/
#define SYSLOG_TIMESTAMP_SIZE 16
#define UNKNOWN "unknown"
/* Including "../fluent-bit/include/fluent-bit/flb_macros.h" causes issues
* with CI, as it requires mk_core/mk_core_info.h which is generated only
* after Fluent Bit has been built. We can instead just redefined a couple
* of macros here: */
#define FLB_FALSE 0
#define FLB_TRUE !FLB_FALSE
/* For similar reasons, (re)define the following macros from "flb_lib.h": */
/* Lib engine status */
#define FLB_LIB_ERROR -1
#define FLB_LIB_NONE 0
#define FLB_LIB_OK 1
#define FLB_LIB_NO_CONFIG_MAP 2
/* Following structs are the same as defined in fluent-bit/flb_lib.h and
* fluent-bit/flb_time.h, but need to be redefined due to use of dlsym(). */
struct flb_time {
struct timespec tm;
};
/* Library mode context data */
struct flb_lib_ctx {
int status;
struct mk_event_loop *event_loop;
struct mk_event *event_channel;
struct flb_config *config;
};
struct flb_parser_types {
char *key;
int key_len;
int type;
};
struct flb_parser {
/* configuration */
int type; /* parser type */
char *name; /* format name */
char *p_regex; /* pattern for main regular expression */
int skip_empty; /* skip empty regex matches */
char *time_fmt; /* time format */
char *time_fmt_full; /* original given time format */
char *time_key; /* field name that contains the time */
int time_offset; /* fixed UTC offset */
int time_keep; /* keep time field */
int time_strict; /* parse time field strictly */
int logfmt_no_bare_keys; /* in logfmt parsers, require all keys to have values */
char *time_frac_secs; /* time format have fractional seconds ? */
struct flb_parser_types *types; /* type casting */
int types_len;
/* Field decoders */
struct mk_list *decoders;
/* internal */
int time_with_year; /* do time_fmt consider a year (%Y) ? */
char *time_fmt_year;
int time_with_tz; /* do time_fmt consider a timezone ? */
struct flb_regex *regex;
struct mk_list _head;
};
struct flb_lib_out_cb {
int (*cb) (void *record, size_t size, void *data);
void *data;
};
typedef struct flb_lib_ctx flb_ctx_t;
static flb_ctx_t *(*flb_create)(void);
static int (*flb_service_set)(flb_ctx_t *ctx, ...);
static int (*flb_start)(flb_ctx_t *ctx);
static int (*flb_stop)(flb_ctx_t *ctx);
static void (*flb_destroy)(flb_ctx_t *ctx);
static int (*flb_time_pop_from_msgpack)(struct flb_time *time, msgpack_unpacked *upk, msgpack_object **map);
static int (*flb_lib_free)(void *data);
static struct flb_parser *(*flb_parser_create)( const char *name, const char *format, const char *p_regex, int skip_empty,
const char *time_fmt, const char *time_key, const char *time_offset,
int time_keep, int time_strict, int logfmt_no_bare_keys,
struct flb_parser_types *types, int types_len,struct mk_list *decoders,
struct flb_config *config);
static int (*flb_input)(flb_ctx_t *ctx, const char *input, void *data);
static int (*flb_input_set)(flb_ctx_t *ctx, int ffd, ...);
// static int (*flb_filter)(flb_ctx_t *ctx, const char *filter, void *data);
// static int (*flb_filter_set)(flb_ctx_t *ctx, int ffd, ...);
static int (*flb_output)(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb);
static int (*flb_output_set)(flb_ctx_t *ctx, int ffd, ...);
static msgpack_unpack_return (*dl_msgpack_unpack_next)(msgpack_unpacked* result, const char* data, size_t len, size_t* off);
static void (*dl_msgpack_zone_free)(msgpack_zone* zone);
static int (*dl_msgpack_object_print_buffer)(char *buffer, size_t buffer_size, msgpack_object o);
static flb_ctx_t *ctx = NULL;
static void *flb_lib_handle = NULL;
static struct flb_lib_out_cb *fwd_input_out_cb = NULL;
extern netdata_mutex_t stdout_mut;
int flb_init(flb_srvc_config_t flb_srvc_config, const char *const stock_config_dir){
int rc = 0;
char *dl_error;
char *flb_lib_path = strdupz_path_subpath(stock_config_dir, "/../libfluent-bit.so");
if (unlikely(NULL == (flb_lib_handle = dlopen(flb_lib_path, RTLD_LAZY)))){
if (NULL != (dl_error = dlerror()))
collector_error("dlopen() libfluent-bit.so error: %s", dl_error);
rc = -1;
goto do_return;
}
dlerror(); /* Clear any existing error */
/* Load Fluent-Bit functions from the shared library */
#define load_function(FUNC_NAME){ \
*(void **) (&FUNC_NAME) = dlsym(flb_lib_handle, LOGS_MANAG_STR(FUNC_NAME)); \
if ((dl_error = dlerror()) != NULL) { \
collector_error("dlerror loading %s: %s", LOGS_MANAG_STR(FUNC_NAME), dl_error); \
rc = -1; \
goto do_return; \
} \
}
load_function(flb_create);
load_function(flb_service_set);
load_function(flb_start);
load_function(flb_stop);
load_function(flb_destroy);
load_function(flb_time_pop_from_msgpack);
load_function(flb_lib_free);
load_function(flb_parser_create);
load_function(flb_input);
load_function(flb_input_set);
// load_function(flb_filter);
// load_function(flb_filter_set);
load_function(flb_output);
load_function(flb_output_set);
*(void **) (&dl_msgpack_unpack_next) = dlsym(flb_lib_handle, "msgpack_unpack_next");
if ((dl_error = dlerror()) != NULL) {
collector_error("dlerror loading msgpack_unpack_next: %s", dl_error);
rc = -1;
goto do_return;
}
*(void **) (&dl_msgpack_zone_free) = dlsym(flb_lib_handle, "msgpack_zone_free");
if ((dl_error = dlerror()) != NULL) {
collector_error("dlerror loading msgpack_zone_free: %s", dl_error);
rc = -1;
goto do_return;
}
*(void **) (&dl_msgpack_object_print_buffer) = dlsym(flb_lib_handle, "msgpack_object_print_buffer");
if ((dl_error = dlerror()) != NULL) {
collector_error("dlerror loading msgpack_object_print_buffer: %s", dl_error);
rc = -1;
goto do_return;
}
ctx = flb_create();
if (unlikely(!ctx)){
rc = -1;
goto do_return;
}
/* Global service settings */
if(unlikely(flb_service_set(ctx,
"Flush" , flb_srvc_config.flush,
"HTTP_Listen" , flb_srvc_config.http_listen,
"HTTP_Port" , flb_srvc_config.http_port,
"HTTP_Server" , flb_srvc_config.http_server,
"Log_File" , flb_srvc_config.log_path,
"Log_Level" , flb_srvc_config.log_level,
"Coro_stack_size" , flb_srvc_config.coro_stack_size,
NULL) != 0 )){
rc = -1;
goto do_return;
}
do_return:
freez(flb_lib_path);
if(unlikely(rc && flb_lib_handle))
dlclose(flb_lib_handle);
return rc;
}
int flb_run(void){
if (likely(flb_start(ctx)) == 0) return 0;
else return -1;
}
void flb_terminate(void){
if(ctx){
flb_stop(ctx);
flb_destroy(ctx);
ctx = NULL;
}
if(flb_lib_handle)
dlclose(flb_lib_handle);
}
static void flb_complete_buff_item(struct File_info *p_file_info){
Circ_buff_t *buff = p_file_info->circ_buff;
m_assert(buff->in->timestamp, "buff->in->timestamp cannot be 0");
m_assert(buff->in->data, "buff->in->text cannot be NULL");
m_assert(*buff->in->data, "*buff->in->text cannot be 0");
m_assert(buff->in->text_size, "buff->in->text_size cannot be 0");
/* Replace last '\n' with '\0' to null-terminate text */
buff->in->data[buff->in->text_size - 1] = '\0';
/* Store status (timestamp and text_size must have already been
* stored during flb_collect_logs_cb() ). */
buff->in->status = CIRC_BUFF_ITEM_STATUS_UNPROCESSED;
/* Load max size of compressed buffer, as calculated previously */
size_t text_compressed_buff_max_size = buff->in->text_compressed_size;
/* Do compression.
* TODO: Validate compression option? */
buff->in->text_compressed = buff->in->data + buff->in->text_size;
buff->in->text_compressed_size = LZ4_compress_fast( buff->in->data,
buff->in->text_compressed,
buff->in->text_size,
text_compressed_buff_max_size,
p_file_info->compression_accel);
m_assert(buff->in->text_compressed_size != 0, "Text_compressed_size should be != 0");
p_file_info->parser_metrics->last_update = buff->in->timestamp / MSEC_PER_SEC;
p_file_info->parser_metrics->num_lines += buff->in->num_lines;
/* Perform custom log chart parsing */
for(int i = 0; p_file_info->parser_cus_config[i]; i++){
p_file_info->parser_metrics->parser_cus[i]->count +=
search_keyword( buff->in->data, buff->in->text_size, NULL, NULL,
NULL, &p_file_info->parser_cus_config[i]->regex, 0);
}
/* Update charts */
netdata_mutex_lock(&stdout_mut);
p_file_info->chart_meta->update(p_file_info);
fflush(stdout);
netdata_mutex_unlock(&stdout_mut);
circ_buff_insert(buff);
uv_timer_again(&p_file_info->flb_tmp_buff_cpy_timer);
}
void flb_complete_item_timer_timeout_cb(uv_timer_t *handle) {
struct File_info *p_file_info = handle->data;
Circ_buff_t *buff = p_file_info->circ_buff;
uv_mutex_lock(&p_file_info->flb_tmp_buff_mut);
if(!buff->in->data || !*buff->in->data || !buff->in->text_size){
p_file_info->parser_metrics->last_update = now_realtime_sec();
netdata_mutex_lock(&stdout_mut);
p_file_info->chart_meta->update(p_file_info);
fflush(stdout);
netdata_mutex_unlock(&stdout_mut);
uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
return;
}
flb_complete_buff_item(p_file_info);
uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
}
static int flb_collect_logs_cb(void *record, size_t size, void *data){
/* "data" is NULL for Forward-type sources and non-NULL for local sources */
struct File_info *p_file_info = (struct File_info *) data;
Circ_buff_t *buff = NULL;
msgpack_unpacked result;
size_t off = 0;
struct flb_time tmp_time;
msgpack_object *x;
char timestamp_str[TIMESTAMP_MS_STR_SIZE] = "";
msec_t timestamp = 0;
struct resizable_key_val_arr {
char **key;
char **val;
size_t *key_size;
size_t *val_size;
int size, max_size;
};
/* FLB_WEB_LOG case */
Log_line_parsed_t line_parsed = (Log_line_parsed_t) {0};
/* FLB_WEB_LOG case end */
/* FLB_KMSG case */
static int skip_kmsg_log_buffering = 1;
int kmsg_sever = -1; // -1 equals invalid
/* FLB_KMSG case end */
/* FLB_SYSTEMD or FLB_SYSLOG case */
char syslog_prival[4] = "";
size_t syslog_prival_size = 0;
char syslog_severity[2] = "";
char syslog_facility[3] = "";
char *syslog_timestamp = NULL;
size_t syslog_timestamp_size = 0;
char *hostname = NULL;
size_t hostname_size = 0;
char *syslog_identifier = NULL;
size_t syslog_identifier_size = 0;
char *pid = NULL;
size_t pid_size = 0;
char *message = NULL;
size_t message_size = 0;
/* FLB_SYSTEMD or FLB_SYSLOG case end */
/* FLB_DOCKER_EV case */
long docker_ev_time = 0;
long docker_ev_timeNano = 0;
char *docker_ev_type = NULL;
size_t docker_ev_type_size = 0;
char *docker_ev_action = NULL;
size_t docker_ev_action_size = 0;
char *docker_ev_id = NULL;
size_t docker_ev_id_size = 0;
static struct resizable_key_val_arr docker_ev_attr = {0};
docker_ev_attr.size = 0;
/* FLB_DOCKER_EV case end */
/* FLB_MQTT case */
char *mqtt_topic = NULL;
size_t mqtt_topic_size = 0;
static char *mqtt_message = NULL;
static size_t mqtt_message_size_max = 0;
/* FLB_MQTT case end */
size_t new_tmp_text_size = 0;
msgpack_unpacked_init(&result);
int iter = 0;
while (dl_msgpack_unpack_next(&result, record, size, &off) == MSGPACK_UNPACK_SUCCESS) {
iter++;
m_assert(iter == 1, "We do not expect more than one loop iteration here");
flb_time_pop_from_msgpack(&tmp_time, &result, &x);
if(likely(x->type == MSGPACK_OBJECT_MAP && x->via.map.size != 0)){
msgpack_object_kv* p = x->via.map.ptr;
msgpack_object_kv* pend = x->via.map.ptr + x->via.map.size;
/* ================================================================
* If p_file_info == NULL, it means it is a "Forward" source, so
* we need to search for the associated p_file_info. This code can
* be optimized further.
* ============================================================== */
if(p_file_info == NULL){
do{
if(!strncmp(p->key.via.str.ptr, "stream guid", (size_t) p->key.via.str.size)){
char *stream_guid = (char *) p->val.via.str.ptr;
size_t stream_guid_size = p->val.via.str.size;
debug_log( "stream guid:%.*s", (int) stream_guid_size, stream_guid);
for (int i = 0; i < p_file_infos_arr->count; i++) {
if(!strncmp(p_file_infos_arr->data[i]->stream_guid, stream_guid, stream_guid_size)){
p_file_info = p_file_infos_arr->data[i];
// debug_log( "p_file_info match found: %s type[%s]",
// p_file_info->stream_guid,
// log_src_type_t_str[p_file_info->log_type]);
break;
}
}
}
++p;
// continue;
} while(p < pend);
}
if(unlikely(p_file_info == NULL)) goto skip_collect_and_drop_logs;
uv_mutex_lock(&p_file_info->flb_tmp_buff_mut);
buff = p_file_info->circ_buff;
p = x->via.map.ptr;
pend = x->via.map.ptr + x->via.map.size;
do{
/* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case */
if( p_file_info->log_type == FLB_TAIL ||
p_file_info->log_type == FLB_WEB_LOG ||
p_file_info->log_type == FLB_SERIAL){
if( !strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size) ||
/* The following line is in case we collect systemd logs
* (tagged as "MESSAGE") or docker_events (tagged as
* "message") via a "Forward" source to an FLB_TAIL
* parent. */
!strncasecmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){
message = (char *) p->val.via.str.ptr;
message_size = p->val.via.str.size;
if(p_file_info->log_type == FLB_WEB_LOG){
parse_web_log_line( (Web_log_parser_config_t *) p_file_info->parser_config->gen_config,
message, message_size, &line_parsed);
if(likely(p_file_info->use_log_timestamp)){
timestamp = line_parsed.timestamp * MSEC_PER_SEC; // convert to msec from sec
{ /* ------------------ FIXME ------------------------
* Temporary kludge so that metrics don't break when
* a new record has timestamp before the current one.
*/
static msec_t previous_timestamp = 0;
if((((long long) timestamp - (long long) previous_timestamp) < 0))
timestamp = previous_timestamp;
previous_timestamp = timestamp;
}
}
}
new_tmp_text_size = message_size + 1; // +1 for '\n'
m_assert(message_size, "message_size is 0");
m_assert(message, "message is NULL");
}
++p;
continue;
} /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case end */
/* FLB_KMSG case */
if(p_file_info->log_type == FLB_KMSG){
if(unlikely(skip_kmsg_log_buffering)){
static time_t netdata_start_time = 0;
if (!netdata_start_time) netdata_start_time = now_boottime_sec();
if(now_boottime_sec() - netdata_start_time < KERNEL_LOGS_COLLECT_INIT_WAIT)
goto skip_collect_and_drop_logs;
else skip_kmsg_log_buffering = 0;
}
/* NOTE/WARNING:
* kmsg timestamps are tricky. The timestamp will be
* *wrong** if the system has gone into hibernation since
* last boot and "p_file_info->use_log_timestamp" is set.
* Even if "p_file_info->use_log_timestamp" is NOT set, we
* need to use now_realtime_msec() as Fluent Bit timestamp
* will also be wrong. */
if( !strncmp(p->key.via.str.ptr, "sec", (size_t) p->key.via.str.size)){
if(p_file_info->use_log_timestamp){
timestamp += (now_realtime_sec() - now_boottime_sec() + p->val.via.i64) * MSEC_PER_SEC;
}
else if(!timestamp)
timestamp = now_realtime_msec();
}
else if(!strncmp(p->key.via.str.ptr, "usec", (size_t) p->key.via.str.size) &&
p_file_info->use_log_timestamp){
timestamp += p->val.via.i64 / USEC_PER_MS;
}
else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY, (size_t) p->key.via.str.size)){
message = (char *) p->val.via.str.ptr;
message_size = p->val.via.str.size;
m_assert(message, "message is NULL");
m_assert(message_size, "message_size is 0");
new_tmp_text_size += message_size + 1; // +1 for '\n'
}
else if(!strncmp(p->key.via.str.ptr, "priority", (size_t) p->key.via.str.size)){
kmsg_sever = (int) p->val.via.u64;
}
++p;
continue;
} /* FLB_KMSG case end */
/* FLB_SYSTEMD or FLB_SYSLOG case */
if( p_file_info->log_type == FLB_SYSTEMD ||
p_file_info->log_type == FLB_SYSLOG){
if( p_file_info->use_log_timestamp && !strncmp( p->key.via.str.ptr,
"SOURCE_REALTIME_TIMESTAMP",
(size_t) p->key.via.str.size)){
m_assert(p->val.via.str.size - 3 == TIMESTAMP_MS_STR_SIZE - 1,
"p->val.via.str.size - 3 != TIMESTAMP_MS_STR_SIZE");
strncpyz(timestamp_str, p->val.via.str.ptr, (size_t) p->val.via.str.size);
char *endptr = NULL;
timestamp = str2ll(timestamp_str, &endptr);
timestamp = *endptr ? 0 : timestamp / USEC_PER_MS;
}
else if(!strncmp(p->key.via.str.ptr, "PRIVAL", (size_t) p->key.via.str.size)){
m_assert(p->val.via.str.size <= 3, "p->val.via.str.size > 3");
strncpyz(syslog_prival, p->val.via.str.ptr, (size_t) p->val.via.str.size);
syslog_prival_size = (size_t) p->val.via.str.size;
m_assert(syslog_prival, "syslog_prival is NULL");
}
else if(!strncmp(p->key.via.str.ptr, "PRIORITY", (size_t) p->key.via.str.size)){
m_assert(p->val.via.str.size <= 1, "p->val.via.str.size > 1");
strncpyz(syslog_severity, p->val.via.str.ptr, (size_t) p->val.via.str.size);
m_assert(syslog_severity, "syslog_severity is NULL");
}
else if(!strncmp(p->key.via.str.ptr, "SYSLOG_FACILITY", (size_t) p->key.via.str.size)){
m_assert(p->val.via.str.size <= 2, "p->val.via.str.size > 2");
strncpyz(syslog_facility, p->val.via.str.ptr, (size_t) p->val.via.str.size);
m_assert(syslog_facility, "syslog_facility is NULL");
}
else if(!strncmp(p->key.via.str.ptr, "SYSLOG_TIMESTAMP", (size_t) p->key.via.str.size)){
syslog_timestamp = (char *) p->val.via.str.ptr;
syslog_timestamp_size = p->val.via.str.size;
m_assert(syslog_timestamp, "syslog_timestamp is NULL");
m_assert(syslog_timestamp_size, "syslog_timestamp_size is 0");
new_tmp_text_size += syslog_timestamp_size;
}
else if(!strncmp(p->key.via.str.ptr, "HOSTNAME", (size_t) p->key.via.str.size)){
hostname = (char *) p->val.via.str.ptr;
hostname_size = p->val.via.str.size;
m_assert(hostname, "hostname is NULL");
m_assert(hostname_size, "hostname_size is 0");
new_tmp_text_size += hostname_size + 1; // +1 for ' ' char
}
else if(!strncmp(p->key.via.str.ptr, "SYSLOG_IDENTIFIER", (size_t) p->key.via.str.size)){
syslog_identifier = (char *) p->val.via.str.ptr;
syslog_identifier_size = p->val.via.str.size;
new_tmp_text_size += syslog_identifier_size;
}
else if(!strncmp(p->key.via.str.ptr, "PID", (size_t) p->key.via.str.size)){
pid = (char *) p->val.via.str.ptr;
pid_size = p->val.via.str.size;
new_tmp_text_size += pid_size;
}
else if(!strncmp(p->key.via.str.ptr, LOG_REC_KEY_SYSTEMD, (size_t) p->key.via.str.size)){
message = (char *) p->val.via.str.ptr;
message_size = p->val.via.str.size;
m_assert(message, "message is NULL");
m_assert(message_size, "message_size is 0");
new_tmp_text_size += message_size;
}
++p;
continue;
} /* FLB_SYSTEMD or FLB_SYSLOG case end */
/* FLB_DOCKER_EV case */
if(p_file_info->log_type == FLB_DOCKER_EV){
if(!strncmp(p->key.via.str.ptr, "time", (size_t) p->key.via.str.size)){
docker_ev_time = p->val.via.i64;
m_assert(docker_ev_time, "docker_ev_time is 0");
}
else if(!strncmp(p->key.via.str.ptr, "timeNano", (size_t) p->key.via.str.size)){
docker_ev_timeNano = p->val.via.i64;
m_assert(docker_ev_timeNano, "docker_ev_timeNano is 0");
if(likely(p_file_info->use_log_timestamp))
timestamp = docker_ev_timeNano / NSEC_PER_MSEC;
}
else if(!strncmp(p->key.via.str.ptr, "Type", (size_t) p->key.via.str.size)){
docker_ev_type = (char *) p->val.via.str.ptr;
docker_ev_type_size = p->val.via.str.size;
m_assert(docker_ev_type, "docker_ev_type is NULL");
m_assert(docker_ev_type_size, "docker_ev_type_size is 0");
// debug_log("docker_ev_type: %.*s", docker_ev_type_size, docker_ev_type);
}
else if(!strncmp(p->key.via.str.ptr, "Action", (size_t) p->key.via.str.size)){
docker_ev_action = (char *) p->val.via.str.ptr;
docker_ev_action_size = p->val.via.str.size;
m_assert(docker_ev_action, "docker_ev_action is NULL");
m_assert(docker_ev_action_size, "docker_ev_action_size is 0");
// debug_log("docker_ev_action: %.*s", docker_ev_action_size, docker_ev_action);
}
else if(!strncmp(p->key.via.str.ptr, "id", (size_t) p->key.via.str.size)){
docker_ev_id = (char *) p->val.via.str.ptr;
docker_ev_id_size = p->val.via.str.size;
m_assert(docker_ev_id, "docker_ev_id is NULL");
m_assert(docker_ev_id_size, "docker_ev_id_size is 0");
// debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id);
}
else if(!strncmp(p->key.via.str.ptr, "Actor", (size_t) p->key.via.str.size)){
// debug_log( "msg key:[%.*s]val:[%.*s]", (int) p->key.via.str.size,
// p->key.via.str.ptr,
// (int) p->val.via.str.size,
// p->val.via.str.ptr);
if(likely(p->val.type == MSGPACK_OBJECT_MAP && p->val.via.map.size != 0)){
msgpack_object_kv* ac = p->val.via.map.ptr;
msgpack_object_kv* const ac_pend= p->val.via.map.ptr + p->val.via.map.size;
do{
if(!strncmp(ac->key.via.str.ptr, "ID", (size_t) ac->key.via.str.size)){
docker_ev_id = (char *) ac->val.via.str.ptr;
docker_ev_id_size = ac->val.via.str.size;
m_assert(docker_ev_id, "docker_ev_id is NULL");
m_assert(docker_ev_id_size, "docker_ev_id_size is 0");
// debug_log("docker_ev_id: %.*s", docker_ev_id_size, docker_ev_id);
}
else if(!strncmp(ac->key.via.str.ptr, "Attributes", (size_t) ac->key.via.str.size)){
if(likely(ac->val.type == MSGPACK_OBJECT_MAP && ac->val.via.map.size != 0)){
msgpack_object_kv* att = ac->val.via.map.ptr;
msgpack_object_kv* const att_pend = ac->val.via.map.ptr + ac->val.via.map.size;
do{
if(unlikely(++docker_ev_attr.size > docker_ev_attr.max_size)){
docker_ev_attr.max_size = docker_ev_attr.size;
docker_ev_attr.key = reallocz(docker_ev_attr.key,
docker_ev_attr.max_size * sizeof(char *));
docker_ev_attr.val = reallocz(docker_ev_attr.val,
docker_ev_attr.max_size * sizeof(char *));
docker_ev_attr.key_size = reallocz(docker_ev_attr.key_size,
docker_ev_attr.max_size * sizeof(size_t));
docker_ev_attr.val_size = reallocz(docker_ev_attr.val_size,
docker_ev_attr.max_size * sizeof(size_t));
}
docker_ev_attr.key[docker_ev_attr.size - 1] = (char *) att->key.via.str.ptr;
docker_ev_attr.val[docker_ev_attr.size - 1] = (char *) att->val.via.str.ptr;
docker_ev_attr.key_size[docker_ev_attr.size - 1] = (size_t) att->key.via.str.size;
docker_ev_attr.val_size[docker_ev_attr.size - 1] = (size_t) att->val.via.str.size;
att++;
continue;
} while(att < att_pend);
}
}
ac++;
continue;
} while(ac < ac_pend);
}
}
++p;
continue;
}
/* FLB_DOCKER_EV case end */
/* FLB_MQTT case */
if(p_file_info->log_type == FLB_MQTT){
if(!strncmp(p->key.via.str.ptr, "topic", (size_t) p->key.via.str.size)){
mqtt_topic = (char *) p->val.via.str.ptr;
mqtt_topic_size = (size_t) p->val.via.str.size;
while(0 == (message_size = dl_msgpack_object_print_buffer(mqtt_message, mqtt_message_size_max, *x)))
mqtt_message = reallocz(mqtt_message, (mqtt_message_size_max += 10));
new_tmp_text_size = message_size + 1; // +1 for '\n'
m_assert(message_size, "message_size is 0");
m_assert(mqtt_message, "mqtt_message is NULL");
break; // watch out, MQTT requires a 'break' here, as we parse the entire 'x' msgpack_object
}
else m_assert(0, "missing mqtt topic");
++p;
continue;
}
} while(p < pend);
}
}
/* If no log timestamp was found, use Fluent Bit collection timestamp. */
if(timestamp == 0)
timestamp = (msec_t) tmp_time.tm.tv_sec * MSEC_PER_SEC + (msec_t) tmp_time.tm.tv_nsec / (NSEC_PER_MSEC);
m_assert(TEST_MS_TIMESTAMP_VALID(timestamp), "timestamp is invalid");
/* If input buffer timestamp is not set, now is the time to set it,
* else just be done with the previous buffer */
if(unlikely(buff->in->timestamp == 0)) buff->in->timestamp = timestamp / 1000 * 1000; // rounding down
else if((timestamp - buff->in->timestamp) >= MSEC_PER_SEC) {
flb_complete_buff_item(p_file_info);
buff->in->timestamp = timestamp / 1000 * 1000; // rounding down
}
m_assert(TEST_MS_TIMESTAMP_VALID(buff->in->timestamp), "buff->in->timestamp is invalid");
new_tmp_text_size += buff->in->text_size;
/* ========================================================================
* Step 2: Extract metrics and reconstruct log record
* ====================================================================== */
/* Parse number of log lines - common for all log source types */
buff->in->num_lines++;
/* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case */
if( p_file_info->log_type == FLB_TAIL ||
p_file_info->log_type == FLB_WEB_LOG ||
p_file_info->log_type == FLB_SERIAL){
if(p_file_info->log_type == FLB_WEB_LOG)
extract_web_log_metrics(p_file_info->parser_config, &line_parsed,
p_file_info->parser_metrics->web_log);
// TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) goto skip_collect_and_drop_logs;
size_t tmp_item_off = buff->in->text_size;
memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
tmp_item_off += message_size;
buff->in->data[tmp_item_off++] = '\n';
m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
buff->in->text_size = new_tmp_text_size;
} /* FLB_TAIL, FLB_WEB_LOG and FLB_SERIAL case end */
/* FLB_KMSG case */
else if(p_file_info->log_type == FLB_KMSG){
char *c;
// see https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
if((c = memchr(message, '\n', message_size))){
const char subsys_str[] = "SUBSYSTEM=",
device_str[] = "DEVICE=";
const size_t subsys_str_len = sizeof(subsys_str) - 1,
device_str_len = sizeof(device_str) - 1;
size_t bytes_remain = message_size - (c - message);
/* Extract machine-readable info for charts, such as subsystem and device. */
while(bytes_remain){
size_t sz = 0;
while(--bytes_remain && c[++sz] != '\n');
if(bytes_remain) --sz;
*(c++) = '\\';
*(c++) = 'n';
sz--;
DICTIONARY *dict = NULL;
char *str = NULL;
size_t str_len = 0;
if(!strncmp(c, subsys_str, subsys_str_len)){
dict = p_file_info->parser_metrics->kernel->subsystem;
str = &c[subsys_str_len];
str_len = (sz - subsys_str_len);
}
else if (!strncmp(c, device_str, device_str_len)){
dict = p_file_info->parser_metrics->kernel->device;
str = &c[device_str_len];
str_len = (sz - device_str_len);
}
if(likely(str)){
char *const key = mallocz(str_len + 1);
memcpy(key, str, str_len);
key[str_len] = '\0';
metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1};
dictionary_set_advanced(dict, key, str_len + 1, &item, sizeof(item), NULL);
}
c = &c[sz];
}
}
if(likely(kmsg_sever >= 0))
p_file_info->parser_metrics->kernel->sever[kmsg_sever]++;
// TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size)))
goto skip_collect_and_drop_logs;
size_t tmp_item_off = buff->in->text_size;
memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
tmp_item_off += message_size;
buff->in->data[tmp_item_off++] = '\n';
m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
buff->in->text_size = new_tmp_text_size;
} /* FLB_KMSG case end */
/* FLB_SYSTEMD or FLB_SYSLOG case */
else if(p_file_info->log_type == FLB_SYSTEMD ||
p_file_info->log_type == FLB_SYSLOG){
int syslog_prival_d = SYSLOG_PRIOR_ARR_SIZE - 1; // Initialise to 'unknown'
int syslog_severity_d = SYSLOG_SEVER_ARR_SIZE - 1; // Initialise to 'unknown'
int syslog_facility_d = SYSLOG_FACIL_ARR_SIZE - 1; // Initialise to 'unknown'
/* FLB_SYSTEMD case has syslog_severity and syslog_facility values that
* are used to calculate syslog_prival from. FLB_SYSLOG is the opposite
* case, as it has a syslog_prival value that is used to calculate
* syslog_severity and syslog_facility from. */
if(p_file_info->log_type == FLB_SYSTEMD){
/* Parse syslog_severity char* field into int and extract metrics.
* syslog_severity_s will consist of 1 char (plus '\0'),
* see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */
if(likely(syslog_severity[0])){
if(likely(str2int(&syslog_severity_d, syslog_severity, 10) == STR2XX_SUCCESS)){
p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++;
} // else parsing errors ++ ??
} else p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown'
/* Parse syslog_facility char* field into int and extract metrics.
* syslog_facility_s will consist of up to 2 chars (plus '\0'),
* see https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.1 */
if(likely(syslog_facility[0])){
if(likely(str2int(&syslog_facility_d, syslog_facility, 10) == STR2XX_SUCCESS)){
p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++;
} // else parsing errors ++ ??
} else p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown'
if(likely(syslog_severity[0] && syslog_facility[0])){
/* Definition of syslog priority value == facility * 8 + severity */
syslog_prival_d = syslog_facility_d * 8 + syslog_severity_d;
syslog_prival_size = snprintfz(syslog_prival, 4, "%d", syslog_prival_d);
m_assert(syslog_prival_size < 4 && syslog_prival_size > 0, "error with snprintf()");
new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>'
p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++;
} else {
new_tmp_text_size += 3; // +3 for "<->" string
p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown'
}
} else if(p_file_info->log_type == FLB_SYSLOG){
if(likely(syslog_prival[0])){
if(likely(str2int(&syslog_prival_d, syslog_prival, 10) == STR2XX_SUCCESS)){
syslog_severity_d = syslog_prival_d % 8;
syslog_facility_d = syslog_prival_d / 8;
p_file_info->parser_metrics->systemd->prior[syslog_prival_d]++;
p_file_info->parser_metrics->systemd->sever[syslog_severity_d]++;
p_file_info->parser_metrics->systemd->facil[syslog_facility_d]++;
new_tmp_text_size += syslog_prival_size + 2; // +2 for '<' and '>'
} // else parsing errors ++ ??
} else {
new_tmp_text_size += 3; // +3 for "<->" string
p_file_info->parser_metrics->systemd->prior[SYSLOG_PRIOR_ARR_SIZE - 1]++; // 'unknown'
p_file_info->parser_metrics->systemd->sever[SYSLOG_SEVER_ARR_SIZE - 1]++; // 'unknown'
p_file_info->parser_metrics->systemd->facil[SYSLOG_FACIL_ARR_SIZE - 1]++; // 'unknown'
}
} else m_assert(0, "shoudn't get here");
char syslog_time_from_flb_time[25]; // 25 just to be on the safe side, but 16 + 1 chars bytes needed only.
if(unlikely(!syslog_timestamp)){
const time_t ts = tmp_time.tm.tv_sec;
struct tm *const tm = localtime(&ts);
strftime(syslog_time_from_flb_time, sizeof(syslog_time_from_flb_time), "%b %d %H:%M:%S ", tm);
new_tmp_text_size += SYSLOG_TIMESTAMP_SIZE;
}
if(unlikely(!syslog_identifier)) new_tmp_text_size += sizeof(UNKNOWN) - 1;
if(unlikely(!pid)) new_tmp_text_size += sizeof(UNKNOWN) - 1;
new_tmp_text_size += 5; // +5 for '[', ']', ':' and ' ' characters around and after pid and '\n' at the end
/* Metrics extracted, now prepare circular buffer for write */
// TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) goto skip_collect_and_drop_logs;
size_t tmp_item_off = buff->in->text_size;
buff->in->data[tmp_item_off++] = '<';
if(likely(syslog_prival[0])){
memcpy(&buff->in->data[tmp_item_off], syslog_prival, syslog_prival_size);
m_assert(syslog_prival_size, "syslog_prival_size cannot be 0");
tmp_item_off += syslog_prival_size;
} else buff->in->data[tmp_item_off++] = '-';
buff->in->data[tmp_item_off++] = '>';
if(likely(syslog_timestamp)){
memcpy(&buff->in->data[tmp_item_off], syslog_timestamp, syslog_timestamp_size);
// FLB_SYSLOG doesn't add space, but FLB_SYSTEMD does:
// if(buff->in->data[tmp_item_off] != ' ') buff->in->data[tmp_item_off++] = ' ';
tmp_item_off += syslog_timestamp_size;
} else {
memcpy(&buff->in->data[tmp_item_off], syslog_time_from_flb_time, SYSLOG_TIMESTAMP_SIZE);
tmp_item_off += SYSLOG_TIMESTAMP_SIZE;
}
if(likely(hostname)){
memcpy(&buff->in->data[tmp_item_off], hostname, hostname_size);
tmp_item_off += hostname_size;
buff->in->data[tmp_item_off++] = ' ';
}
if(likely(syslog_identifier)){
memcpy(&buff->in->data[tmp_item_off], syslog_identifier, syslog_identifier_size);
tmp_item_off += syslog_identifier_size;
} else {
memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1);
tmp_item_off += sizeof(UNKNOWN) - 1;
}
buff->in->data[tmp_item_off++] = '[';
if(likely(pid)){
memcpy(&buff->in->data[tmp_item_off], pid, pid_size);
tmp_item_off += pid_size;
} else {
memcpy(&buff->in->data[tmp_item_off], UNKNOWN, sizeof(UNKNOWN) - 1);
tmp_item_off += sizeof(UNKNOWN) - 1;
}
buff->in->data[tmp_item_off++] = ']';
buff->in->data[tmp_item_off++] = ':';
buff->in->data[tmp_item_off++] = ' ';
if(likely(message)){
memcpy_iscntrl_fix(&buff->in->data[tmp_item_off], message, message_size);
tmp_item_off += message_size;
}
buff->in->data[tmp_item_off++] = '\n';
m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
buff->in->text_size = new_tmp_text_size;
} /* FLB_SYSTEMD or FLB_SYSLOG case end */
/* FLB_DOCKER_EV case */
else if(p_file_info->log_type == FLB_DOCKER_EV){
const size_t docker_ev_datetime_size = sizeof "2022-08-26T15:33:20.802840200+0000" /* example datetime */;
char docker_ev_datetime[docker_ev_datetime_size];
docker_ev_datetime[0] = 0;
if(likely(docker_ev_time && docker_ev_timeNano)){
struct timespec ts;
ts.tv_sec = docker_ev_time;
if(unlikely(0 == strftime( docker_ev_datetime, docker_ev_datetime_size,
"%Y-%m-%dT%H:%M:%S.000000000%z", localtime(&ts.tv_sec)))) { /* TODO: do what if error? */};
const size_t docker_ev_timeNano_s_size = sizeof "802840200";
char docker_ev_timeNano_s[docker_ev_timeNano_s_size];
snprintfz( docker_ev_timeNano_s, docker_ev_timeNano_s_size, "%0*ld",
(int) docker_ev_timeNano_s_size, docker_ev_timeNano % 1000000000);
memcpy(&docker_ev_datetime[20], &docker_ev_timeNano_s, docker_ev_timeNano_s_size - 1);
new_tmp_text_size += docker_ev_datetime_size; // -1 for null terminator, +1 for ' ' character
}
if(likely(docker_ev_type && docker_ev_action)){
int ev_off = -1;
while(++ev_off < NUM_OF_DOCKER_EV_TYPES){
if(!strncmp(docker_ev_type, docker_ev_type_string[ev_off], docker_ev_type_size)){
p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++;
int act_off = -1;
while(docker_ev_action_string[ev_off][++act_off] != NULL){
if(!strncmp(docker_ev_action, docker_ev_action_string[ev_off][act_off], docker_ev_action_size)){
p_file_info->parser_metrics->docker_ev->ev_action[ev_off][act_off]++;
break;
}
}
if(unlikely(docker_ev_action_string[ev_off][act_off] == NULL))
p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown'
break;
}
}
if(unlikely(ev_off >= NUM_OF_DOCKER_EV_TYPES - 1)){
p_file_info->parser_metrics->docker_ev->ev_type[ev_off]++; // 'unknown'
p_file_info->parser_metrics->docker_ev->ev_action[NUM_OF_DOCKER_EV_TYPES - 1][0]++; // 'unknown'
}
new_tmp_text_size += docker_ev_type_size + docker_ev_action_size + 2; // +2 for ' ' chars
}
if(likely(docker_ev_id)){
// debug_log("docker_ev_id: %.*s", (int) docker_ev_id_size, docker_ev_id);
new_tmp_text_size += docker_ev_id_size + 1; // +1 for ' ' char
}
if(likely(docker_ev_attr.size)){
for(int i = 0; i < docker_ev_attr.size; i++){
new_tmp_text_size += docker_ev_attr.key_size[i] +
docker_ev_attr.val_size[i] + 3; // +3 for '=' ',' ' ' characters
}
/* new_tmp_text_size = -2 + 2;
* -2 due to missing ',' ' ' from last attribute and +2 for the two
* '(' and ')' characters, so no need to add or subtract */
}
new_tmp_text_size += 1; // +1 for '\n' character at the end
/* Metrics extracted, now prepare circular buffer for write */
// TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) goto skip_collect_and_drop_logs;
size_t tmp_item_off = buff->in->text_size;
if(likely(*docker_ev_datetime)){
memcpy(&buff->in->data[tmp_item_off], docker_ev_datetime, docker_ev_datetime_size - 1);
tmp_item_off += docker_ev_datetime_size - 1; // -1 due to null terminator
buff->in->data[tmp_item_off++] = ' ';
}
if(likely(docker_ev_type)){
memcpy(&buff->in->data[tmp_item_off], docker_ev_type, docker_ev_type_size);
tmp_item_off += docker_ev_type_size;
buff->in->data[tmp_item_off++] = ' ';
}
if(likely(docker_ev_action)){
memcpy(&buff->in->data[tmp_item_off], docker_ev_action, docker_ev_action_size);
tmp_item_off += docker_ev_action_size;
buff->in->data[tmp_item_off++] = ' ';
}
if(likely(docker_ev_id)){
memcpy(&buff->in->data[tmp_item_off], docker_ev_id, docker_ev_id_size);
tmp_item_off += docker_ev_id_size;
buff->in->data[tmp_item_off++] = ' ';
}
if(likely(docker_ev_attr.size)){
buff->in->data[tmp_item_off++] = '(';
for(int i = 0; i < docker_ev_attr.size; i++){
memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.key[i], docker_ev_attr.key_size[i]);
tmp_item_off += docker_ev_attr.key_size[i];
buff->in->data[tmp_item_off++] = '=';
memcpy(&buff->in->data[tmp_item_off], docker_ev_attr.val[i], docker_ev_attr.val_size[i]);
tmp_item_off += docker_ev_attr.val_size[i];
buff->in->data[tmp_item_off++] = ',';
buff->in->data[tmp_item_off++] = ' ';
}
tmp_item_off -= 2; // overwrite last ',' and ' ' characters with a ')' character
buff->in->data[tmp_item_off++] = ')';
}
buff->in->data[tmp_item_off++] = '\n';
m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
buff->in->text_size = new_tmp_text_size;
} /* FLB_DOCKER_EV case end */
/* FLB_MQTT case */
else if(p_file_info->log_type == FLB_MQTT){
if(likely(mqtt_topic)){
char *const key = mallocz(mqtt_topic_size + 1);
memcpy(key, mqtt_topic, mqtt_topic_size);
key[mqtt_topic_size] = '\0';
metrics_dict_item_t item = {.dim_initialized = false, .num_new = 1};
dictionary_set_advanced(p_file_info->parser_metrics->mqtt->topic, key, mqtt_topic_size + 1, &item, sizeof(item), NULL);
// TODO: Fix: Metrics will still be collected if circ_buff_prepare_write() returns 0.
if(unlikely(!circ_buff_prepare_write(buff, new_tmp_text_size))) goto skip_collect_and_drop_logs;
size_t tmp_item_off = buff->in->text_size;
memcpy(&buff->in->data[tmp_item_off], mqtt_message, message_size);
tmp_item_off += message_size;
buff->in->data[tmp_item_off++] = '\n';
m_assert(tmp_item_off == new_tmp_text_size, "tmp_item_off should be == new_tmp_text_size");
buff->in->text_size = new_tmp_text_size;
}
else m_assert(0, "missing mqtt topic");
}
skip_collect_and_drop_logs:
/* Following code is equivalent to msgpack_unpacked_destroy(&result) due
* to that function call being unavailable when using dl_open() */
if(result.zone != NULL) {
dl_msgpack_zone_free(result.zone);
result.zone = NULL;
memset(&result.data, 0, sizeof(msgpack_object));
}
if(p_file_info) uv_mutex_unlock(&p_file_info->flb_tmp_buff_mut);
flb_lib_free(record);
// FLB_OUTPUT_RETURN(FLB_OK); // Watch out! This breaks output - won't flush all pending logs
return 0;
}
/**
* @brief Add a Fluent-Bit input that outputs to the "lib" Fluent-Bit plugin.
* @param[in] p_file_info Pointer to the log source struct where the input will
* be registered to.
* @return 0 on success, a negative number for any errors (see enum).
*/
int flb_add_input(struct File_info *const p_file_info){
enum return_values {
SUCCESS = 0,
INVALID_LOG_TYPE = -1,
CONFIG_READ_ERROR = -2,
FLB_PARSER_CREATE_ERROR = -3,
FLB_INPUT_ERROR = -4,
FLB_INPUT_SET_ERROR = -5,
FLB_OUTPUT_ERROR = -6,
FLB_OUTPUT_SET_ERROR = -7,
DEFAULT_ERROR = -8
};
const int tag_max_size = 5;
static unsigned tag = 0; // incremental tag id to link flb inputs to outputs
char tag_s[tag_max_size];
snprintfz(tag_s, tag_max_size, "%u", tag++);
switch(p_file_info->log_type){
case FLB_TAIL:
case FLB_WEB_LOG: {
char update_every_str[10];
snprintfz(update_every_str, 10, "%d", p_file_info->update_every);
debug_log("Setting up %s tail for %s (basename:%s)",
p_file_info->log_type == FLB_TAIL ? "FLB_TAIL" : "FLB_WEB_LOG",
p_file_info->filename, p_file_info->file_basename);
Flb_tail_config_t *tail_config = (Flb_tail_config_t *) p_file_info->flb_config;
if(unlikely(!tail_config)) return CONFIG_READ_ERROR;
/* Set up input from log source */
p_file_info->flb_input = flb_input(ctx, "tail", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Path", p_file_info->filename,
"Key", LOG_REC_KEY,
"Refresh_Interval", update_every_str,
"Skip_Long_Lines", "On",
"Skip_Empty_Lines", "On",
#if defined(FLB_HAVE_INOTIFY)
"Inotify_Watcher", tail_config->use_inotify ? "true" : "false",
#endif
NULL) != 0) return FLB_INPUT_SET_ERROR;
break;
}
case FLB_KMSG: {
debug_log( "Setting up FLB_KMSG collector");
/* Set up kmsg input */
p_file_info->flb_input = flb_input(ctx, "kmsg", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
NULL) != 0) return FLB_INPUT_SET_ERROR;
break;
}
case FLB_SYSTEMD: {
debug_log( "Setting up FLB_SYSTEMD collector");
/* Set up systemd input */
p_file_info->flb_input = flb_input(ctx, "systemd", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(!strcmp(p_file_info->filename, SYSTEMD_DEFAULT_PATH)){
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Read_From_Tail", "On",
"Strip_Underscores", "On",
NULL) != 0) return FLB_INPUT_SET_ERROR;
} else {
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Read_From_Tail", "On",
"Strip_Underscores", "On",
"Path", p_file_info->filename,
NULL) != 0) return FLB_INPUT_SET_ERROR;
}
break;
}
case FLB_DOCKER_EV: {
debug_log( "Setting up FLB_DOCKER_EV collector");
/* Set up Docker Events parser */
if(flb_parser_create( "docker_events_parser", /* parser name */
"json", /* backend type */
NULL, /* regex */
FLB_TRUE, /* skip_empty */
NULL, /* time format */
NULL, /* time key */
NULL, /* time offset */
FLB_TRUE, /* time keep */
FLB_FALSE, /* time strict */
FLB_FALSE, /* no bare keys */
NULL, /* parser types */
0, /* types len */
NULL, /* decoders */
ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR;
/* Set up Docker Events input */
p_file_info->flb_input = flb_input(ctx, "docker_events", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Parser", "docker_events_parser",
"Unix_Path", p_file_info->filename,
NULL) != 0) return FLB_INPUT_SET_ERROR;
break;
}
case FLB_SYSLOG: {
debug_log( "Setting up FLB_SYSLOG collector");
/* Set up syslog parser */
const char syslog_parser_prfx[] = "syslog_parser_";
size_t parser_name_size = sizeof(syslog_parser_prfx) + tag_max_size - 1;
char parser_name[parser_name_size];
snprintfz(parser_name, parser_name_size, "%s%u", syslog_parser_prfx, tag);
Syslog_parser_config_t *syslog_config = (Syslog_parser_config_t *) p_file_info->parser_config->gen_config;
if(unlikely(!syslog_config ||
!syslog_config->socket_config ||
!syslog_config->socket_config->mode ||
!p_file_info->filename)) return CONFIG_READ_ERROR;
if(flb_parser_create( parser_name, /* parser name */
"regex", /* backend type */
syslog_config->log_format, /* regex */
FLB_TRUE, /* skip_empty */
NULL, /* time format */
NULL, /* time key */
NULL, /* time offset */
FLB_TRUE, /* time keep */
FLB_TRUE, /* time strict */
FLB_FALSE, /* no bare keys */
NULL, /* parser types */
0, /* types len */
NULL, /* decoders */
ctx->config) == NULL) return FLB_PARSER_CREATE_ERROR;
/* Set up syslog input */
p_file_info->flb_input = flb_input(ctx, "syslog", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if( !strcmp(syslog_config->socket_config->mode, "unix_udp") ||
!strcmp(syslog_config->socket_config->mode, "unix_tcp")){
m_assert(syslog_config->socket_config->unix_perm, "unix_perm is not set");
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Path", p_file_info->filename,
"Parser", parser_name,
"Mode", syslog_config->socket_config->mode,
"Unix_Perm", syslog_config->socket_config->unix_perm,
NULL) != 0) return FLB_INPUT_SET_ERROR;
} else if( !strcmp(syslog_config->socket_config->mode, "udp") ||
!strcmp(syslog_config->socket_config->mode, "tcp")){
m_assert(syslog_config->socket_config->listen, "listen is not set");
m_assert(syslog_config->socket_config->port, "port is not set");
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Parser", parser_name,
"Mode", syslog_config->socket_config->mode,
"Listen", syslog_config->socket_config->listen,
"Port", syslog_config->socket_config->port,
NULL) != 0) return FLB_INPUT_SET_ERROR;
} else return FLB_INPUT_SET_ERROR; // should never reach this line
break;
}
case FLB_SERIAL: {
debug_log( "Setting up FLB_SERIAL collector");
Flb_serial_config_t *serial_config = (Flb_serial_config_t *) p_file_info->flb_config;
if(unlikely(!serial_config || !serial_config->bitrate || !*serial_config->bitrate ||
!serial_config->min_bytes || !p_file_info->filename)) return CONFIG_READ_ERROR;
/* Set up serial input */
p_file_info->flb_input = flb_input(ctx, "serial", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"File", p_file_info->filename,
"Bitrate", serial_config->bitrate,
"Separator", serial_config->separator,
"Format", serial_config->format,
NULL) != 0) return FLB_INPUT_SET_ERROR;
break;
}
case FLB_MQTT: {
debug_log( "Setting up FLB_MQTT collector");
Flb_socket_config_t *socket_config = (Flb_socket_config_t *) p_file_info->flb_config;
if(unlikely(!socket_config || !socket_config->listen || !*socket_config->listen ||
!socket_config->port || !*socket_config->port)) return CONFIG_READ_ERROR;
/* Set up MQTT input */
p_file_info->flb_input = flb_input(ctx, "mqtt", NULL);
if(p_file_info->flb_input < 0 ) return FLB_INPUT_ERROR;
if(flb_input_set(ctx, p_file_info->flb_input,
"Tag", tag_s,
"Listen", socket_config->listen,
"Port", socket_config->port,
NULL) != 0) return FLB_INPUT_SET_ERROR;
break;
}
default: {
m_assert(0, "default: case in flb_add_input() error");
return DEFAULT_ERROR; // Shouldn't reach here
}
}
/* Set up user-configured outputs */
for(Flb_output_config_t *output = p_file_info->flb_outputs; output; output = output->next){
debug_log( "setting up user output [%s]", output->plugin);
int out = flb_output(ctx, output->plugin, NULL);
if(out < 0) return FLB_OUTPUT_ERROR;
if(flb_output_set(ctx, out,
"Match", tag_s,
NULL) != 0) return FLB_OUTPUT_SET_ERROR;
for(struct flb_output_config_param *param = output->param; param; param = param->next){
debug_log( "setting up param [%s][%s] of output [%s]", param->key, param->val, output->plugin);
if(flb_output_set(ctx, out,
param->key, param->val,
NULL) != 0) return FLB_OUTPUT_SET_ERROR;
}
}
/* Set up "lib" output */
struct flb_lib_out_cb *callback = mallocz(sizeof(struct flb_lib_out_cb));
callback->cb = flb_collect_logs_cb;
callback->data = p_file_info;
if(((p_file_info->flb_lib_output = flb_output(ctx, "lib", callback)) < 0) ||
(flb_output_set(ctx, p_file_info->flb_lib_output, "Match", tag_s, NULL) != 0)){
freez(callback);
return FLB_OUTPUT_ERROR;
}
return SUCCESS;
}
/**
* @brief Add a Fluent-Bit Forward input.
* @details This creates a unix or network socket to accept logs using
* Fluent Bit's Forward protocol. For more information see:
* https://docs.fluentbit.io/manual/pipeline/inputs/forward
* @param[in] forward_in_config Configuration of the Forward input socket.
* @return 0 on success, -1 on error.
*/
int flb_add_fwd_input(Flb_socket_config_t *forward_in_config){
if(forward_in_config == NULL){
debug_log( "forward: forward_in_config is NULL");
collector_info("forward_in_config is NULL");
return 0;
}
do{
debug_log( "forward: Setting up flb_add_fwd_input()");
int input, output;
if((input = flb_input(ctx, "forward", NULL)) < 0) break;
if( forward_in_config->unix_path && *forward_in_config->unix_path &&
forward_in_config->unix_perm && *forward_in_config->unix_perm){
if(flb_input_set(ctx, input,
"Tag_Prefix", "fwd",
"Unix_Path", forward_in_config->unix_path,
"Unix_Perm", forward_in_config->unix_perm,
NULL) != 0) break;
} else if( forward_in_config->listen && *forward_in_config->listen &&
forward_in_config->port && *forward_in_config->port){
if(flb_input_set(ctx, input,
"Tag_Prefix", "fwd",
"Listen", forward_in_config->listen,
"Port", forward_in_config->port,
NULL) != 0) break;
} else break; // should never reach this line
fwd_input_out_cb = mallocz(sizeof(struct flb_lib_out_cb));
/* Set up output */
fwd_input_out_cb->cb = flb_collect_logs_cb;
fwd_input_out_cb->data = NULL;
if((output = flb_output(ctx, "lib", fwd_input_out_cb)) < 0) break;
if(flb_output_set(ctx, output,
"Match", "fwd*",
NULL) != 0) break;
debug_log( "forward: Set up flb_add_fwd_input() with success");
return 0;
} while(0);
/* Error */
if(fwd_input_out_cb) freez(fwd_input_out_cb);
return -1;
}
void flb_free_fwd_input_out_cb(void){
freez(fwd_input_out_cb);
}