From 943ee2482b16a81afd54b426f4fb0952f99c48e7 Mon Sep 17 00:00:00 2001 From: Vladimir Kobal <vlad@prokk.net> Date: Thu, 5 Nov 2020 19:08:17 +0200 Subject: [PATCH] Add HTTP and HTTPS support to the simple exporting connector (#9911) --- .gitignore | 2 + CMakeLists.txt | 1 + Makefile.am | 1 + backends/backends.c | 8 +- exporting/README.md | 15 +- exporting/aws_kinesis/aws_kinesis.c | 2 +- exporting/clean_connectors.c | 43 +++ exporting/exporting_engine.h | 51 +++- exporting/graphite/README.md | 4 + exporting/graphite/graphite.c | 45 ++- exporting/graphite/graphite.h | 2 + exporting/init_connectors.c | 45 ++- exporting/json/README.md | 4 + exporting/json/json.c | 138 ++++++++- exporting/json/json.h | 6 + exporting/mongodb/mongodb.c | 11 +- exporting/mongodb/mongodb.h | 1 - exporting/opentsdb/README.md | 42 +-- exporting/opentsdb/opentsdb.c | 110 ++++--- exporting/opentsdb/opentsdb.h | 10 +- exporting/process_data.c | 58 +++- exporting/prometheus/remote_write/README.md | 3 + .../prometheus/remote_write/remote_write.c | 69 +++-- .../prometheus/remote_write/remote_write.h | 6 +- .../remote_write/remote_write_request.h | 4 - exporting/pubsub/pubsub.c | 2 +- exporting/read_config.c | 34 ++- exporting/send_data.c | 280 ++++++++++-------- exporting/tests/exporting_doubles.c | 7 + exporting/tests/test_exporting_engine.c | 172 ++++++----- exporting/tests/test_exporting_engine.h | 2 + libnetdata/config/appconfig.c | 10 +- libnetdata/socket/security.c | 26 +- libnetdata/socket/security.h | 4 +- web/api/exporters/allmetrics.c | 28 +- 35 files changed, 847 insertions(+), 399 deletions(-) diff --git a/.gitignore b/.gitignore index 79851707cf..07dd9f0088 100644 --- a/.gitignore +++ b/.gitignore @@ -146,6 +146,8 @@ cmake-build-release/ CMakeCache.txt CMakeFiles/ cmake_install.cmake +.cmake +compile_commands.json # jetbrains IDE .jetbrains* diff --git a/CMakeLists.txt b/CMakeLists.txt index ac72ec44fb..00f9f12b97 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1270,6 +1270,7 @@ endif() -Wl,--wrap=connect_to_one_of -Wl,--wrap=create_main_rusage_chart -Wl,--wrap=send_main_rusage + -Wl,--wrap=simple_connector_end_batch ${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS} ${KINESIS_LINK_OPTIONS} ${PUBSUB_LINK_OPTIONS} diff --git a/Makefile.am b/Makefile.am index 24de58e66f..d5daff71d3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -955,6 +955,7 @@ if ENABLE_UNITTESTS -Wl,--wrap=connect_to_one_of \ -Wl,--wrap=create_main_rusage_chart \ -Wl,--wrap=send_main_rusage \ + -Wl,--wrap=simple_connector_end_batch \ $(TEST_LDFLAGS) \ $(NULL) exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) diff --git a/backends/backends.c b/backends/backends.c index 34df05f26c..074e18679d 100644 --- a/backends/backends.c +++ b/backends/backends.c @@ -551,7 +551,7 @@ void *backends_main(void *ptr) { case BACKEND_TYPE_OPENTSDB_USING_HTTP: { #ifdef ENABLE_HTTPS if (!strcmp(type, "opentsdb:https")) { - security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); } #endif backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter); @@ -1001,9 +1001,9 @@ void *backends_main(void *ptr) { sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); #ifdef ENABLE_HTTPS if(sock != -1) { - if(netdata_opentsdb_ctx) { + if(netdata_exporting_ctx) { if(!opentsdb_ssl.conn) { - opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx); + opentsdb_ssl.conn = SSL_new(netdata_exporting_ctx); if(!opentsdb_ssl.conn) { error("Failed to allocate SSL structure %d.", sock); opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; @@ -1229,7 +1229,7 @@ cleanup: buffer_free(response); #ifdef ENABLE_HTTPS - if(netdata_opentsdb_ctx) { + if(netdata_exporting_ctx) { if(opentsdb_ssl.conn) { SSL_free(opentsdb_ssl.conn); } diff --git a/exporting/README.md b/exporting/README.md index 1a04e4085d..8f490cc2ff 100644 --- a/exporting/README.md +++ b/exporting/README.md @@ -44,7 +44,7 @@ X seconds (though, it can send them per second if you need it to). also be configured). Learn more in our guide to [export and visualize Netdata metrics in Graphite](/docs/guides/export/export-netdata-metrics-graphite.md). - [**JSON** document databases](/exporting/json/README.md) - - [**OpenTSDB**](/exporting/opentsdb/README.md): Use a plaintext, HTTP, or HTTPS interfaces. Metrics are sent to + - [**OpenTSDB**](/exporting/opentsdb/README.md): Use a plaintext or HTTP interfaces. Metrics are sent to OpenTSDB as `prefix.chart.dimension` with tag `host=hostname`. - [**MongoDB**](/exporting/mongodb/README.md): Metrics are sent to the database in `JSON` format. - [**Prometheus**](/exporting/prometheus/README.md): Use an existing Prometheus installation to scrape metrics @@ -173,8 +173,10 @@ You can configure each connector individually using the available [options](#opt - `[prometheus:exporter]` defines settings for Prometheus exporter API queries (e.g.: `http://NODE:19999/api/v1/allmetrics?format=prometheus&help=yes&source=as-collected`). - `[<type>:<name>]` keeps settings for a particular exporting connector instance, where: - - `type` selects the exporting connector type: graphite | opentsdb:telnet | opentsdb:http | opentsdb:https | - prometheus_remote_write | json | kinesis | pubsub | mongodb + - `type` selects the exporting connector type: graphite | opentsdb:telnet | opentsdb:http | + prometheus_remote_write | json | kinesis | pubsub | mongodb. For graphite, opentsdb, + json, and prometheus_remote_write connectors you can also use `:http` or `:https` modifiers + (e.g.: `opentsdb:https`). - `name` can be arbitrary instance name you chose. ### Options @@ -270,6 +272,13 @@ Configure individual connectors and override any global settings with the follow > You can check how the host tags were parsed using the /api/v1/info API call. But, keep in mind that backends subsystem > is deprecated and will be deleted soon. Please move your existing tags to the `[host labels]` section. +## HTTPS + +Netdata can send metrics to external databases using the TLS/SSL protocol. Unfortunately, some of +them does not support encrypted connections, so you will have to configure a reverse proxy to enable +HTTPS communication between Netdata and an external database. You can set up a reverse proxy with +[Nginx](/docs/Running-behind-nginx.md). + ## Exporting engine monitoring Netdata creates five charts in the dashboard, under the **Netdata Monitoring** section, to help you monitor the health diff --git a/exporting/aws_kinesis/aws_kinesis.c b/exporting/aws_kinesis/aws_kinesis.c index 00600fa9bb..036afb49fc 100644 --- a/exporting/aws_kinesis/aws_kinesis.c +++ b/exporting/aws_kinesis/aws_kinesis.c @@ -48,7 +48,7 @@ int init_aws_kinesis_instance(struct instance *instance) instance->end_host_formatting = flush_host_labels; instance->end_batch_formatting = NULL; - instance->send_header = NULL; + instance->prepare_header = NULL; instance->check_response = NULL; instance->buffer = (void *)buffer_create(0); diff --git a/exporting/clean_connectors.c b/exporting/clean_connectors.c index 4ad644d69c..459777f0fd 100644 --- a/exporting/clean_connectors.c +++ b/exporting/clean_connectors.c @@ -35,3 +35,46 @@ void clean_instance(struct instance *instance) uv_cond_destroy(&instance->cond_var); // uv_mutex_destroy(&instance->mutex); } + +/** + * Clean up a simple connector instance on Netdata exit + * + * @param instance an instance data structure. + */ +void simple_connector_cleanup(struct instance *instance) +{ + info("EXPORTING: cleaning up instance %s ...", instance->config.name); + + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; + + buffer_free(instance->buffer); + buffer_free(simple_connector_data->buffer); + buffer_free(simple_connector_data->header); + + struct simple_connector_buffer *next_buffer = simple_connector_data->first_buffer; + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct simple_connector_buffer *current_buffer = next_buffer; + next_buffer = next_buffer->next; + + buffer_free(current_buffer->header); + buffer_free(current_buffer->buffer); + freez(current_buffer); + } + +#ifdef ENABLE_HTTPS + if (simple_connector_data->conn) + SSL_free(simple_connector_data->conn); +#endif + + freez(simple_connector_data); + + struct simple_connector_config *simple_connector_config = + (struct simple_connector_config *)instance->config.connector_specific_config; + freez(simple_connector_config); + + info("EXPORTING: instance %s exited", instance->config.name); + instance->exited = 1; + + return; +} diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h index b436f5d809..e0993c98f8 100644 --- a/exporting/exporting_engine.h +++ b/exporting/exporting_engine.h @@ -46,10 +46,12 @@ typedef enum exporting_options { typedef enum exporting_connector_types { EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite - EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET, // Send data to OpenTSDB using telnet API - EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP, // Send data to OpenTSDB using HTTP API - EXPORTING_CONNECTOR_TYPE_JSON, // Stores the data using JSON. - EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // The user selected to use Prometheus backend + EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP, // Send data to Graphite using HTTP API + EXPORTING_CONNECTOR_TYPE_JSON, // Send data in JSON format + EXPORTING_CONNECTOR_TYPE_JSON_HTTP, // Send data in JSON format using HTTP API + EXPORTING_CONNECTOR_TYPE_OPENTSDB, // Send data to OpenTSDB using telnet API + EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP, // Send data to OpenTSDB using HTTP API + EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // User selected to use Prometheus backend EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection @@ -81,6 +83,38 @@ struct simple_connector_config { int default_port; }; +struct simple_connector_buffer { + BUFFER *header; + BUFFER *buffer; + + size_t buffered_metrics; + size_t buffered_bytes; + + int used; + + struct simple_connector_buffer *next; +}; + +struct simple_connector_data { + void *connector_specific_data; + + size_t total_buffered_metrics; + + BUFFER *header; + BUFFER *buffer; + size_t buffered_metrics; + size_t buffered_bytes; + + struct simple_connector_buffer *previous_buffer; + struct simple_connector_buffer *first_buffer; + struct simple_connector_buffer *last_buffer; + +#ifdef ENABLE_HTTPS + SSL *conn; //SSL connection + int flags; //The flags for SSL connection +#endif +}; + struct prometheus_remote_write_specific_config { char *remote_write_path; }; @@ -175,7 +209,7 @@ struct instance { int (*end_host_formatting)(struct instance *instance, RRDHOST *host); int (*end_batch_formatting)(struct instance *instance); - int (*send_header)(int *sock, struct instance *instance); + void (*prepare_header)(struct instance *instance); int (*check_response)(BUFFER *buffer, struct instance *instance); void *connector_specific_data; @@ -210,6 +244,7 @@ struct engine *read_exporting_config(); EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type); int init_connectors(struct engine *engine); +void simple_connector_init(struct instance *instance); int mark_scheduled_instances(struct engine *engine); void prepare_buffers(struct engine *engine); @@ -232,11 +267,12 @@ void end_chart_formatting(struct engine *engine, RRDSET *st); void end_host_formatting(struct engine *engine, RRDHOST *host); void end_batch_formatting(struct engine *engine); int flush_host_labels(struct instance *instance, RRDHOST *host); -int simple_connector_update_buffered_bytes(struct instance *instance); +int simple_connector_end_batch(struct instance *instance); int exporting_discard_response(BUFFER *buffer, struct instance *instance); void simple_connector_receive_response(int *sock, struct instance *instance); -void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance); +void simple_connector_send_buffer( + int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics); void simple_connector_worker(void *instance_p); void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); @@ -244,6 +280,7 @@ void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system); void send_internal_metrics(struct instance *instance); extern void clean_instance(struct instance *ptr); +void simple_connector_cleanup(struct instance *instance); static inline void disable_instance(struct instance *instance) { diff --git a/exporting/graphite/README.md b/exporting/graphite/README.md index 95b8ef954d..2ac7adc7af 100644 --- a/exporting/graphite/README.md +++ b/exporting/graphite/README.md @@ -3,6 +3,7 @@ title: "Export metrics to Graphite providers" sidebar_label: Graphite description: "Archive your Agent's metrics to a any Graphite database provider for long-term storage, further analysis, or correlation with data from other sources." custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/graphite/README.md +sidebar_label: Graphite --> # Export metrics to Graphite providers @@ -21,6 +22,9 @@ directory and set the following options: destination = localhost:2003 ``` +Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `graphite:http:my_graphite_instance`, +`graphite:https:my_graphite_instance`. + The Graphite connector is further configurable using additional settings. See the [exporting reference doc](/exporting/README.md#options) for details. diff --git a/exporting/graphite/graphite.c b/exporting/graphite/graphite.c index 6aad2054db..23e40c369e 100644 --- a/exporting/graphite/graphite.c +++ b/exporting/graphite/graphite.c @@ -16,6 +16,17 @@ int init_graphite_instance(struct instance *instance) instance->config.connector_specific_config = (void *)connector_specific_config; connector_specific_config->default_port = 2003; + struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = connector_specific_data; + +#ifdef ENABLE_HTTPS + connector_specific_data->flags = NETDATA_SSL_START; + connector_specific_data->conn = NULL; + if (instance->config.options & EXPORTING_OPTION_USE_TLS) { + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); + } +#endif + instance->start_batch_formatting = NULL; instance->start_host_formatting = format_host_labels_graphite_plaintext; instance->start_chart_formatting = NULL; @@ -27,9 +38,13 @@ int init_graphite_instance(struct instance *instance) instance->end_chart_formatting = NULL; instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = simple_connector_update_buffered_bytes; + instance->end_batch_formatting = simple_connector_end_batch; + + if (instance->config.type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP) + instance->prepare_header = graphite_http_prepare_header; + else + instance->prepare_header = NULL; - instance->send_header = NULL; instance->check_response = exporting_discard_response; instance->buffer = (void *)buffer_create(0); @@ -37,6 +52,9 @@ int init_graphite_instance(struct instance *instance) error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name); return 1; } + + simple_connector_init(instance); + if (uv_mutex_init(&instance->mutex)) return 1; if (uv_cond_init(&instance->cond_var)) @@ -187,3 +205,26 @@ int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM return 0; } + +/** + * Ppepare HTTP header + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +void graphite_http_prepare_header(struct instance *instance) +{ + struct simple_connector_data *simple_connector_data = instance->connector_specific_data; + + buffer_sprintf( + simple_connector_data->last_buffer->header, + "POST /api/put HTTP/1.1\r\n" + "Host: %s\r\n" + "Content-Type: application/graphite\r\n" + "Content-Length: %lu\r\n" + "\r\n", + instance->config.destination, + buffer_strlen(simple_connector_data->last_buffer->buffer)); + + return; +} diff --git a/exporting/graphite/graphite.h b/exporting/graphite/graphite.h index edda498e85..993c12e57a 100644 --- a/exporting/graphite/graphite.h +++ b/exporting/graphite/graphite.h @@ -13,4 +13,6 @@ int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *ho int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd); +void graphite_http_prepare_header(struct instance *instance); + #endif //NETDATA_EXPORTING_GRAPHITE_H diff --git a/exporting/init_connectors.c b/exporting/init_connectors.c index 4a68118970..57ac030d9a 100644 --- a/exporting/init_connectors.c +++ b/exporting/init_connectors.c @@ -40,15 +40,23 @@ int init_connectors(struct engine *engine) if (init_graphite_instance(instance) != 0) return 1; break; + case EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP: + if (init_graphite_instance(instance) != 0) + return 1; + break; case EXPORTING_CONNECTOR_TYPE_JSON: if (init_json_instance(instance) != 0) return 1; break; - case EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET: + case EXPORTING_CONNECTOR_TYPE_JSON_HTTP: + if (init_json_http_instance(instance) != 0) + return 1; + break; + case EXPORTING_CONNECTOR_TYPE_OPENTSDB: if (init_opentsdb_telnet_instance(instance) != 0) return 1; break; - case EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP: + case EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP: if (init_opentsdb_http_instance(instance) != 0) return 1; break; @@ -96,3 +104,36 @@ int init_connectors(struct engine *engine) return 0; } + +/** + * Initialize a ring buffer for a simple connector + * + * @param instance an instance data structure. + */ +void simple_connector_init(struct instance *instance) +{ + struct simple_connector_data *connector_specific_data = + (struct simple_connector_data *)instance->connector_specific_data; + + // create a ring buffer + struct simple_connector_buffer *first_buffer = NULL; + + if (instance->config.buffer_on_failures < 1) + instance->config.buffer_on_failures = 1; + + for (int i = 0; i < instance->config.buffer_on_failures; i++) { + struct simple_connector_buffer *current_buffer = callocz(1, sizeof(struct simple_connector_buffer)); + + if (!connector_specific_data->first_buffer) + first_buffer = current_buffer; + else + current_buffer->next = connector_specific_data->first_buffer; + + connector_specific_data->first_buffer = current_buffer; + } + + first_buffer->next = connector_specific_data->first_buffer; + connector_specific_data->last_buffer = connector_specific_data->first_buffer; + + return; +} diff --git a/exporting/json/README.md b/exporting/json/README.md index 8ef084cf22..1dd41e738e 100644 --- a/exporting/json/README.md +++ b/exporting/json/README.md @@ -3,6 +3,7 @@ title: "Export metrics to JSON document databases" sidebar_label: JSON description: "Archive your Agent's metrics to a JSON document database for long-term storage, further analysis, or correlation with data from other sources." custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/json/README.md +sidebar_label: JSON Document Databases --> # Export metrics to JSON document databases @@ -21,6 +22,9 @@ directory and set the following options: destination = localhost:5448 ``` +Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `json:http:my_json_instance`, +`json:https:my_json_instance`. + The JSON connector is further configurable using additional settings. See the [exporting reference doc](/exporting/README.md#options) for details. diff --git a/exporting/json/json.c b/exporting/json/json.c index cea7eff07c..458e7e8cce 100644 --- a/exporting/json/json.c +++ b/exporting/json/json.c @@ -16,6 +16,9 @@ int init_json_instance(struct instance *instance) instance->config.connector_specific_config = (void *)connector_specific_config; connector_specific_config->default_port = 5448; + struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = connector_specific_data; + instance->start_batch_formatting = NULL; instance->start_host_formatting = format_host_labels_json_plaintext; instance->start_chart_formatting = NULL; @@ -27,9 +30,10 @@ int init_json_instance(struct instance *instance) instance->end_chart_formatting = NULL; instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = simple_connector_update_buffered_bytes; + instance->end_batch_formatting = simple_connector_end_batch; + + instance->prepare_header = NULL; - instance->send_header = NULL; instance->check_response = exporting_discard_response; instance->buffer = (void *)buffer_create(0); @@ -37,6 +41,63 @@ int init_json_instance(struct instance *instance) error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name); return 1; } + + simple_connector_init(instance); + + if (uv_mutex_init(&instance->mutex)) + return 1; + if (uv_cond_init(&instance->cond_var)) + return 1; + + return 0; +} + +/** + * Initialize JSON connector instance for HTTP protocol + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +int init_json_http_instance(struct instance *instance) +{ + instance->worker = simple_connector_worker; + + struct simple_connector_config *connector_specific_config = callocz(1, sizeof(struct simple_connector_config)); + instance->config.connector_specific_config = (void *)connector_specific_config; + connector_specific_config->default_port = 5448; + + struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = connector_specific_data; + +#ifdef ENABLE_HTTPS + connector_specific_data->flags = NETDATA_SSL_START; + connector_specific_data->conn = NULL; + if (instance->config.options & EXPORTING_OPTION_USE_TLS) { + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); + } +#endif + + instance->start_batch_formatting = open_batch_json_http; + instance->start_host_formatting = format_host_labels_json_plaintext; + instance->start_chart_formatting = NULL; + + if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED) + instance->metric_formatting = format_dimension_collected_json_plaintext; + else + instance->metric_formatting = format_dimension_stored_json_plaintext; + + instance->end_chart_formatting = NULL; + instance->end_host_formatting = flush_host_labels; + instance->end_batch_formatting = close_batch_json_http; + + instance->prepare_header = json_http_prepare_header; + + instance->check_response = exporting_discard_response; + + instance->buffer = (void *)buffer_create(0); + + simple_connector_init(instance); + if (uv_mutex_init(&instance->mutex)) return 1; if (uv_cond_init(&instance->cond_var)) @@ -111,6 +172,11 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM } } + if (instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP) { + if (buffer_strlen((BUFFER *)instance->buffer) > 2) + buffer_strcat(instance->buffer, ",\n"); + } + buffer_sprintf( instance->buffer, @@ -131,7 +197,7 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM "\"name\":\"%s\"," "\"value\":" COLLECTED_NUMBER_FORMAT "," - "\"timestamp\":%llu}\n", + "\"timestamp\":%llu}", instance->config.prefix, (host == localhost) ? engine->config.hostname : host->hostname, @@ -153,6 +219,10 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM (unsigned long long)rd->last_collected_time.tv_sec); + if (instance->config.type != EXPORTING_CONNECTOR_TYPE_JSON_HTTP) { + buffer_strcat(instance->buffer, "\n"); + } + return 0; } @@ -189,6 +259,11 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd } } + if (instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP) { + if (buffer_strlen((BUFFER *)instance->buffer) > 2) + buffer_strcat(instance->buffer, ",\n"); + } + buffer_sprintf( instance->buffer, "{" @@ -208,7 +283,7 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd "\"name\":\"%s\"," "\"value\":" CALCULATED_NUMBER_FORMAT "," - "\"timestamp\": %llu}\n", + "\"timestamp\": %llu}", instance->config.prefix, (host == localhost) ? engine->config.hostname : host->hostname, @@ -230,5 +305,60 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd (unsigned long long)last_t); + if (instance->config.type != EXPORTING_CONNECTOR_TYPE_JSON_HTTP) { + buffer_strcat(instance->buffer, "\n"); + } + return 0; } + +/** + * Open a JSON list for a bach + * + * @param instance an instance data structure. + * @return Always returns 0. + */ +int open_batch_json_http(struct instance *instance) +{ + buffer_strcat(instance->buffer, "[\n"); + + return 0; +} + +/** + * Close a JSON list for a bach and update buffered bytes counter + * + * @param instance an instance data structure. + * @return Always returns 0. + */ +int close_batch_json_http(struct instance *instance) +{ + buffer_strcat(instance->buffer, "\n]\n"); + + simple_connector_end_batch(instance); + + return 0; +} + +/** + * Prepare HTTP header + * + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. + */ +void json_http_prepare_header(struct instance *instance) +{ + struct simple_connector_data *simple_connector_data = instance->connector_specific_data; + + buffer_sprintf( + simple_connector_data->last_buffer->header, + "POST /api/put HTTP/1.1\r\n" + "Host: %s\r\n" + "Content-Type: application/json\r\n" + "Content-Length: %lu\r\n" + "\r\n", + instance->config.destination, + buffer_strlen(simple_connector_data->last_buffer->buffer)); + + return; +} diff --git a/exporting/json/json.h b/exporting/json/json.h index f4d3756631..d916263a9b 100644 --- a/exporting/json/json.h +++ b/exporting/json/json.h @@ -6,10 +6,16 @@ #include "exporting/exporting_engine.h" int init_json_instance(struct instance *instance); +int init_json_http_instance(struct instance *instance); int format_host_labels_json_plaintext(struct instance *instance, RRDHOST *host); int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd); +int open_batch_json_http(struct instance *instance); +int close_batch_json_http(struct instance *instance); + +void json_http_prepare_header(struct instance *instance); + #endif //NETDATA_EXPORTING_JSON_H diff --git a/exporting/mongodb/mongodb.c b/exporting/mongodb/mongodb.c index c2cb72f9e3..44922a2424 100644 --- a/exporting/mongodb/mongodb.c +++ b/exporting/mongodb/mongodb.c @@ -102,7 +102,7 @@ int init_mongodb_instance(struct instance *instance) instance->end_host_formatting = flush_host_labels; instance->end_batch_formatting = format_batch_mongodb; - instance->send_header = NULL; + instance->prepare_header = NULL; instance->check_response = NULL; instance->buffer = (void *)buffer_create(0); @@ -284,9 +284,12 @@ void mongodb_connector_worker(void *instance_p) struct stats *stats = &instance->stats; uv_mutex_lock(&instance->mutex); - while (!instance->data_is_ready) - uv_cond_wait(&instance->cond_var, &instance->mutex); - instance->data_is_ready = 0; + if (!connector_specific_data->first_buffer->insert || + !connector_specific_data->first_buffer->documents_inserted) { + while (!instance->data_is_ready) + uv_cond_wait(&instance->cond_var, &instance->mutex); + instance->data_is_ready = 0; + } if (unlikely(instance->engine->exit)) { uv_mutex_unlock(&instance->mutex); diff --git a/exporting/mongodb/mongodb.h b/exporting/mongodb/mongodb.h index 5116e66fab..f1867b2888 100644 --- a/exporting/mongodb/mongodb.h +++ b/exporting/mongodb/mongodb.h @@ -21,7 +21,6 @@ struct mongodb_specific_data { size_t total_documents_inserted; - bson_t **current_insert; struct bson_buffer *first_buffer; struct bson_buffer *last_buffer; }; diff --git a/exporting/opentsdb/README.md b/exporting/opentsdb/README.md index 6b5f198b0b..3765ad2712 100644 --- a/exporting/opentsdb/README.md +++ b/exporting/opentsdb/README.md @@ -1,40 +1,30 @@ <!-- -title: "Export metrics to OpenTSDB with HTTP" -description: "Archive your Agent's metrics to a OpenTSDB database for long-term storage and further analysis." +title: "Export metrics to OpenTSDB" +description: "Archive your Agent's metrics to an OpenTSDB database for long-term storage and further analysis." custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/opentsdb/README.md -sidebar_label: OpenTSDB with HTTP +sidebar_label: OpenTSDB --> -# Export metrics to OpenTSDB with HTTP +# Export metrics to OpenTSDB -Netdata can easily communicate with OpenTSDB using HTTP API. To enable this channel, run `./edit-config exporting.conf` -in the Netdata configuration directory and set the following options: +You can use the OpenTSDB connector for the [exporting engine](/exporting/README.md) to archive your agent's metrics to OpenTSDB +databases for long-term storage, further analysis, or correlation with data from other sources. + +## Configuration + +To enable data exporting to an OpenTSDB database, run `./edit-config exporting.conf` in the Netdata configuration +directory and set the following options: ```conf -[opentsdb:http:my_instance] +[opentsdb:my_opentsdb_instance] enabled = yes destination = localhost:4242 ``` -In this example, OpenTSDB is running with its default port, which is `4242`. If you run OpenTSDB on a different port, -change the `destination = localhost:4242` line accordingly. +Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `opentsdb:http:my_opentsdb_instance`, +`opentsdb:https:my_opentsdb_instance`. -## HTTPS - -As of [v1.16.0](https://github.com/netdata/netdata/releases/tag/v1.16.0), Netdata can send metrics to OpenTSDB using -TLS/SSL. Unfortunately, OpenTDSB does not support encrypted connections, so you will have to configure a reverse proxy -to enable HTTPS communication between Netdata and OpenTSBD. You can set up a reverse proxy with -[Nginx](/docs/Running-behind-nginx.md). - -After your proxy is configured, make the following changes to `exporting.conf`: - -```conf -[opentsdb:https:my_instance] - enabled = yes - destination = localhost:8082 -``` - -In this example, we used the port `8082` for our reverse proxy. If your reverse proxy listens on a different port, -change the `destination = localhost:8082` line accordingly. +The OpenTSDB connector is further configurable using additional settings. See the [exporting reference +doc](/exporting/README.md#options) for details. [](<>) diff --git a/exporting/opentsdb/opentsdb.c b/exporting/opentsdb/opentsdb.c index 76ee0a7bf2..4ee1e3a6f3 100644 --- a/exporting/opentsdb/opentsdb.c +++ b/exporting/opentsdb/opentsdb.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "opentsdb.h" +#include "../json/json.h" /** * Initialize OpenTSDB telnet connector instance @@ -16,6 +17,17 @@ int init_opentsdb_telnet_instance(struct instance *instance) instance->config.connector_specific_config = (void *)connector_specific_config; connector_specific_config->default_port = 4242; + struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = connector_specific_data; + +#ifdef ENABLE_HTTPS + connector_specific_data->flags = NETDATA_SSL_START; + connector_specific_data->conn = NULL; + if (instance->config.options & EXPORTING_OPTION_USE_TLS) { + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); + } +#endif + instance->start_batch_formatting = NULL; instance->start_host_formatting = format_host_labels_opentsdb_telnet; instance->start_chart_formatting = NULL; @@ -27,9 +39,9 @@ int init_opentsdb_telnet_instance(struct instance *instance) instance->end_chart_formatting = NULL; instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = simple_connector_update_buffered_bytes; + instance->end_batch_formatting = simple_connector_end_batch; - instance->send_header = NULL; + instance->prepare_header = NULL; instance->check_response = exporting_discard_response; instance->buffer = (void *)buffer_create(0); @@ -37,6 +49,9 @@ int init_opentsdb_telnet_instance(struct instance *instance) error("EXPORTING: cannot create buffer for opentsdb telnet exporting connector instance %s", instance->config.name); return 1; } + + simple_connector_init(instance); + if (uv_mutex_init(&instance->mutex)) return 1; if (uv_cond_init(&instance->cond_var)) @@ -59,17 +74,17 @@ int init_opentsdb_http_instance(struct instance *instance) instance->config.connector_specific_config = (void *)connector_specific_config; connector_specific_config->default_port = 4242; + struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data)); #ifdef ENABLE_HTTPS - struct opentsdb_specific_data *connector_specific_data = callocz(1, sizeof(struct opentsdb_specific_data)); connector_specific_data->flags = NETDATA_SSL_START; connector_specific_data->conn = NULL; if (instance->config.options & EXPORTING_OPTION_USE_TLS) { - security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); } - instance->connector_specific_data = connector_specific_data; #endif + instance->connector_specific_data = connector_specific_data; - instance->start_batch_formatting = NULL; + instance->start_batch_formatting = open_batch_json_http; instance->start_host_formatting = format_host_labels_opentsdb_http; instance->start_chart_formatting = NULL; @@ -80,9 +95,9 @@ int init_opentsdb_http_instance(struct instance *instance) instance->end_chart_formatting = NULL; instance->end_host_formatting = flush_host_labels; - instance->end_batch_formatting = simple_connector_update_buffered_bytes; + instance->end_batch_formatting = close_batch_json_http; - instance->send_header = NULL; + instance->prepare_header = opentsdb_http_prepare_header; instance->check_response = exporting_discard_response; instance->buffer = (void *)buffer_create(0); @@ -90,6 +105,9 @@ int init_opentsdb_http_instance(struct instance *instance) error("EXPORTING: cannot create buffer for opentsdb HTTP exporting connector instance %s", instance->config.name); return 1; } + + simple_connector_init(instance); + if (uv_mutex_init(&instance->mutex)) return 1; if (uv_cond_init(&instance->cond_var)) @@ -240,26 +258,26 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r } /** - * Prepare an HTTP message for OpenTSDB HTTP connector + * Ppepare HTTP header * - * @param buffer a buffer to write the message to. - * @param message the body of the message. - * @param hostname the name of the host that sends the message. - * @param length the length of the message body. + * @param instance an instance data structure. + * @return Returns 0 on success, 1 on failure. */ -static inline void opentsdb_build_message(BUFFER *buffer, char *message, const char *hostname, int length) +void opentsdb_http_prepare_header(struct instance *instance) { + struct simple_connector_data *simple_connector_data = instance->connector_specific_data; + buffer_sprintf( - buffer, + simple_connector_data->last_buffer->header, "POST /api/put HTTP/1.1\r\n" "Host: %s\r\n" "Content-Type: application/json\r\n" - "Content-Length: %d\r\n" - "\r\n" - "%s", - hostname, - length, - message); + "Content-Length: %lu\r\n" + "\r\n", + instance->config.destination, + buffer_strlen(simple_connector_data->last_buffer->buffer)); + + return; } /** @@ -324,17 +342,18 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM * (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, RRD_ID_LENGTH_MAX); - char message[1024]; - int length = snprintfz( - message, - sizeof(message), + if (buffer_strlen((BUFFER *)instance->buffer) > 2) + buffer_strcat(instance->buffer, ",\n"); + + buffer_sprintf( + instance->buffer, "{" - " \"metric\": \"%s.%s.%s\"," - " \"timestamp\": %llu," - " \"value\": " COLLECTED_NUMBER_FORMAT "," - " \"tags\": {" - " \"host\": \"%s%s%s\"%s" - " }" + "\"metric\":\"%s.%s.%s\"," + "\"timestamp\":%llu," + "\"value\":"COLLECTED_NUMBER_FORMAT"," + "\"tags\":{" + "\"host\":\"%s%s%s\"%s" + "}" "}", instance->config.prefix, chart_name, @@ -346,10 +365,6 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM * (host->tags) ? host->tags : "", instance->labels ? buffer_tostring(instance->labels) : ""); - if (length > 0) { - opentsdb_build_message(instance->buffer, message, engine->config.hostname, length); - } - return 0; } @@ -384,17 +399,18 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd) if(isnan(value)) return 0; - char message[1024]; - int length = snprintfz( - message, - sizeof(message), + if (buffer_strlen((BUFFER *)instance->buffer) > 2) + buffer_strcat(instance->buffer, ",\n"); + + buffer_sprintf( + instance->buffer, "{" - " \"metric\": \"%s.%s.%s\"," - " \"timestamp\": %llu," - " \"value\": " CALCULATED_NUMBER_FORMAT "," - " \"tags\": {" - " \"host\": \"%s%s%s\"%s" - " }" + "\"metric\":\"%s.%s.%s\"," + "\"timestamp\":%llu," + "\"value\":"CALCULATED_NUMBER_FORMAT"," + "\"tags\":{" + "\"host\":\"%s%s%s\"%s" + "}" "}", instance->config.prefix, chart_name, @@ -406,9 +422,5 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd) (host->tags) ? host->tags : "", instance->labels ? buffer_tostring(instance->labels) : ""); - if (length > 0) { - opentsdb_build_message(instance->buffer, message, engine->config.hostname, length); - } - return 0; } diff --git a/exporting/opentsdb/opentsdb.h b/exporting/opentsdb/opentsdb.h index d7eeee0ac8..d53a5054f5 100644 --- a/exporting/opentsdb/opentsdb.h +++ b/exporting/opentsdb/opentsdb.h @@ -18,11 +18,9 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *rd); int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd); -#ifdef ENABLE_HTTPS -struct opentsdb_specific_data { - SSL *conn; //SSL connection - int flags; //The flags for SSL connection -}; -#endif +int open_batch_opentsdb_http(struct instance *instance); +int close_batch_opentsdb_http(struct instance *instance); + +void opentsdb_http_prepare_header(struct instance *instance); #endif //NETDATA_EXPORTING_OPENTSDB_H diff --git a/exporting/process_data.c b/exporting/process_data.c index 00087a22b3..5e11b39482 100644 --- a/exporting/process_data.c +++ b/exporting/process_data.c @@ -363,14 +363,64 @@ int flush_host_labels(struct instance *instance, RRDHOST *host) } /** - * Update stats for buffered bytes + * End a batch for a simple connector * * @param instance an instance data structure. - * @return Always returns 0. + * @return Returns 0 on success, 1 on failure. */ -int simple_connector_update_buffered_bytes(struct instance *instance) +int simple_connector_end_batch(struct instance *instance) { - instance->stats.buffered_bytes = (collected_number)buffer_strlen((BUFFER *)(instance->buffer)); + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; + struct stats *stats = &instance->stats; + + BUFFER *instance_buffer = (BUFFER *)instance->buffer; + struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer; + + if (!last_buffer->buffer) { + last_buffer->buffer = buffer_create(0); + } + + if (last_buffer->used) { + // ring buffer is full, reuse the oldest element + simple_connector_data->first_buffer = simple_connector_data->first_buffer->next; + + stats->data_lost_events++; + stats->lost_metrics += last_buffer->buffered_metrics; + stats->lost_bytes += last_buffer->buffered_bytes; + } + + // swap buffers + BUFFER *tmp_buffer = last_buffer->buffer; + last_buffer->buffer = instance_buffer; + instance->buffer = instance_buffer = tmp_buffer; + + buffer_flush(instance_buffer); + + if (last_buffer->header) + buffer_flush(last_buffer->header); + else + last_buffer->header = buffer_create(0); + + if (instance->prepare_header) + instance->prepare_header(instance); + + // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number + // of metrics, added in the current iteration, so we are clearing it here. We will use the + // simple_connector_data->total_buffered_metrics in the worker to show the statistics. + size_t buffered_metrics = (size_t)stats->buffered_metrics; + stats->buffered_metrics = 0; + + size_t buffered_bytes = buffer_strlen(last_buffer->buffer); + + last_buffer->buffered_metrics = buffered_metrics; + last_buffer->buffered_bytes = buffered_bytes; + last_buffer->used++; + + simple_connector_data->total_buffered_metrics += buffered_metrics; + stats->buffered_bytes += buffered_bytes; + + simple_connector_data->last_buffer = simple_connector_data->last_buffer->next; return 0; } diff --git a/exporting/prometheus/remote_write/README.md b/exporting/prometheus/remote_write/README.md index b8dcb875c9..0e67e3459d 100644 --- a/exporting/prometheus/remote_write/README.md +++ b/exporting/prometheus/remote_write/README.md @@ -30,6 +30,9 @@ in the Netdata configuration directory and set the following options: remote write URL path = /receive ``` +You can also add `:https` modifier to the connector type if you need to use the TLS/SSL protocol. For example: +`remote_write:https:my_instance`. + `remote write URL path` is used to set an endpoint path for the remote write protocol. The default value is `/receive`. For example, if your endpoint is `http://example.domain:example_port/storage/read`: diff --git a/exporting/prometheus/remote_write/remote_write.c b/exporting/prometheus/remote_write/remote_write.c index 6106146164..7f905c116f 100644 --- a/exporting/prometheus/remote_write/remote_write.c +++ b/exporting/prometheus/remote_write/remote_write.c @@ -10,28 +10,18 @@ char family[PROMETHEUS_ELEMENT_MAX + 1]; char units[PROMETHEUS_ELEMENT_MAX + 1] = ""; /** - * Send header to a server + * Prepare HTTP header * - * @param sock a communication socket. * @param instance an instance data structure. - * @return Returns 0 on success, 1 on failure. */ -int prometheus_remote_write_send_header(int *sock, struct instance *instance) +void prometheus_remote_write_prepare_header(struct instance *instance) { - int flags = 0; -#ifdef MSG_NOSIGNAL - flags += MSG_NOSIGNAL; -#endif - struct prometheus_remote_write_specific_config *connector_specific_config = instance->config.connector_specific_config; - - static BUFFER *header; - if (!header) - header = buffer_create(0); + struct simple_connector_data *simple_connector_data = instance->connector_specific_data; buffer_sprintf( - header, + simple_connector_data->last_buffer->header, "POST %s HTTP/1.1\r\n" "Host: %s\r\n" "Accept: */*\r\n" @@ -40,17 +30,7 @@ int prometheus_remote_write_send_header(int *sock, struct instance *instance) "Content-Type: application/x-www-form-urlencoded\r\n\r\n", connector_specific_config->remote_write_path, instance->config.destination, - buffer_strlen((BUFFER *)instance->buffer)); - - size_t header_len = buffer_strlen(header); - ssize_t written = send(*sock, buffer_tostring(header), header_len, flags); - - buffer_flush(header); - - if (written != -1 && (size_t)written == header_len) - return 0; - else - return 1; + buffer_strlen(simple_connector_data->last_buffer->buffer)); } /** @@ -90,7 +70,8 @@ int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *in */ void clean_prometheus_remote_write(struct instance *instance) { - freez(instance->connector_specific_data); + struct simple_connector_data *simple_connector_data = instance->connector_specific_data; + freez(simple_connector_data->connector_specific_data); struct prometheus_remote_write_specific_config *connector_specific_config = instance->config.connector_specific_config; @@ -115,22 +96,32 @@ int init_prometheus_remote_write_instance(struct instance *instance) instance->end_host_formatting = NULL; instance->end_batch_formatting = format_batch_prometheus_remote_write; - instance->send_header = prometheus_remote_write_send_header; + instance->prepare_header = prometheus_remote_write_prepare_header; instance->check_response = process_prometheus_remote_write_response; instance->buffer = (void *)buffer_create(0); - if (!instance->buffer) { - error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name); - return 1; - } + if (uv_mutex_init(&instance->mutex)) return 1; if (uv_cond_init(&instance->cond_var)) return 1; + struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = simple_connector_data; + +#ifdef ENABLE_HTTPS + simple_connector_data->flags = NETDATA_SSL_START; + simple_connector_data->conn = NULL; + if (instance->config.options & EXPORTING_OPTION_USE_TLS) { + security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING); + } +#endif + struct prometheus_remote_write_specific_data *connector_specific_data = callocz(1, sizeof(struct prometheus_remote_write_specific_data)); - instance->connector_specific_data = (void *)connector_specific_data; + simple_connector_data->connector_specific_data = (void *)connector_specific_data; + + simple_connector_init(instance); connector_specific_data->write_request = init_write_request(); @@ -148,8 +139,10 @@ int init_prometheus_remote_write_instance(struct instance *instance) */ int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host) { + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; struct prometheus_remote_write_specific_data *connector_specific_data = - (struct prometheus_remote_write_specific_data *)instance->connector_specific_data; + (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data; char hostname[PROMETHEUS_ELEMENT_MAX + 1]; prometheus_label_copy( @@ -225,8 +218,10 @@ int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st) */ int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd) { + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; struct prometheus_remote_write_specific_data *connector_specific_data = - (struct prometheus_remote_write_specific_data *)instance->connector_specific_data; + (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data; if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) { char name[PROMETHEUS_LABELS_MAX + 1]; @@ -322,8 +317,10 @@ int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM * */ int format_batch_prometheus_remote_write(struct instance *instance) { + struct simple_connector_data *simple_connector_data = + (struct simple_connector_data *)instance->connector_specific_data; struct prometheus_remote_write_specific_data *connector_specific_data = - (struct prometheus_remote_write_specific_data *)instance->connector_specific_data; + (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data; size_t data_size = get_write_request_size(connector_specific_data->write_request); @@ -342,5 +339,7 @@ int format_batch_prometheus_remote_write(struct instance *instance) buffer->len = data_size; instance->stats.buffered_bytes = (collected_number)buffer_strlen(buffer); + simple_connector_end_batch(instance); + return 0; } diff --git a/exporting/prometheus/remote_write/remote_write.h b/exporting/prometheus/remote_write/remote_write.h index 3bd212414a..d738f5126f 100644 --- a/exporting/prometheus/remote_write/remote_write.h +++ b/exporting/prometheus/remote_write/remote_write.h @@ -7,6 +7,10 @@ #include "exporting/prometheus/prometheus.h" #include "remote_write_request.h" +struct prometheus_remote_write_specific_data { + void *write_request; +}; + int init_prometheus_remote_write_instance(struct instance *instance); extern void clean_prometheus_remote_write(struct instance *instance); @@ -15,7 +19,7 @@ int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st); int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd); int format_batch_prometheus_remote_write(struct instance *instance); -int prometheus_remote_write_send_header(int *sock, struct instance *instance); +void prometheus_remote_write_prepare_header(struct instance *instance); int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance); #endif //NETDATA_EXPORTING_PROMETHEUS_REMOTE_WRITE_H diff --git a/exporting/prometheus/remote_write/remote_write_request.h b/exporting/prometheus/remote_write/remote_write_request.h index ea8f0f6795..e1dfacaf86 100644 --- a/exporting/prometheus/remote_write/remote_write_request.h +++ b/exporting/prometheus/remote_write/remote_write_request.h @@ -7,10 +7,6 @@ extern "C" { #endif -struct prometheus_remote_write_specific_data { - void *write_request; -}; - void *init_write_request(); void add_host_info( diff --git a/exporting/pubsub/pubsub.c b/exporting/pubsub/pubsub.c index ead44ed2ac..336a096abf 100644 --- a/exporting/pubsub/pubsub.c +++ b/exporting/pubsub/pubsub.c @@ -26,7 +26,7 @@ int init_pubsub_instance(struct instance *instance) instance->end_host_formatting = flush_host_labels; instance->end_batch_formatting = NULL; - instance->send_header = NULL; + instance->prepare_header = NULL; instance->check_response = NULL; instance->buffer = (void *)buffer_create(0); diff --git a/exporting/read_config.c b/exporting/read_config.c index 1450d3036d..2b6cec6a9a 100644 --- a/exporting/read_config.c +++ b/exporting/read_config.c @@ -135,13 +135,20 @@ EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type) { if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { return EXPORTING_CONNECTOR_TYPE_GRAPHITE; - } else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { - return EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET; - } else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) { - return EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP; + } else if (!strcmp(type, "graphite:http") || !strcmp(type, "graphite:https")) { + return EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP; } else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { return EXPORTING_CONNECTOR_TYPE_JSON; - } else if (!strcmp(type, "prometheus_remote_write")) { + } else if (!strcmp(type, "json:http") || !strcmp(type, "json:https")) { + return EXPORTING_CONNECTOR_TYPE_JSON_HTTP; + } else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { + return EXPORTING_CONNECTOR_TYPE_OPENTSDB; + } else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) { + return EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP; + } else if ( + !strcmp(type, "prometheus_remote_write") || + !strcmp(type, "prometheus_remote_write:http") || + !strcmp(type, "prometheus_remote_write:https")) { return EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE; } else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { return EXPORTING_CONNECTOR_TYPE_KINESIS; @@ -432,7 +439,22 @@ struct engine *read_exporting_config() tmp_instance->config.prefix = strdupz(exporter_get(instance_name, "prefix", "netdata")); #ifdef ENABLE_HTTPS - if (tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && !strncmp(tmp_ci_list->local_ci.connector_name, "opentsdb:https", 14)) { + +#define STR_GRAPHITE_HTTPS "graphite:https" +#define STR_JSON_HTTPS "json:https" +#define STR_OPENTSDB_HTTPS "opentsdb:https" +#define STR_PROMETHEUS_REMOTE_WRITE_HTTPS "prometheus_remote_write:https" + + if ((tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP && + !strncmp(tmp_ci_list->local_ci.connector_name, STR_GRAPHITE_HTTPS, strlen(STR_GRAPHITE_HTTPS))) || + (tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP && + !strncmp(tmp_ci_list->local_ci.connector_name, STR_JSON_HTTPS, strlen(STR_JSON_HTTPS))) || + (tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP && + !strncmp(tmp_ci_list->local_ci.connector_name, STR_OPENTSDB_HTTPS, strlen(STR_OPENTSDB_HTTPS))) || + (tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE && + !strncmp( + tmp_ci_list->local_ci.connector_name, STR_PROMETHEUS_REMOTE_WRITE_HTTPS, + strlen(STR_PROMETHEUS_REMOTE_WRITE_HTTPS)))) { tmp_instance->config.options |= EXPORTING_OPTION_USE_TLS; } #endif diff --git a/exporting/send_data.c b/exporting/send_data.c index cc107ea2dd..1618c1bc09 100644 --- a/exporting/send_data.c +++ b/exporting/send_data.c @@ -2,6 +2,22 @@ #include "exporting_engine.h" +/** + * Check if TLS is enabled in the configuration + * + * @param type buffer with response data. + * @param options an instance data structure. + * @return Returns 1 if TLS should be enabled, 0 otherwise. + */ +static int exporting_tls_is_enabled(EXPORTING_CONNECTOR_TYPE type, EXPORTING_OPTIONS options) +{ + return (type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP || + type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP || + type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP || + type == EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE) && + options & EXPORTING_OPTION_USE_TLS; +} + /** * Discard response * @@ -12,6 +28,7 @@ * @return Always returns 0. */ int exporting_discard_response(BUFFER *buffer, struct instance *instance) { +#if NETDATA_INTERNAL_CHECKS char sample[1024]; const char *s = buffer_tostring(buffer); char *d = sample, *e = &sample[sizeof(sample) - 1]; @@ -23,11 +40,16 @@ int exporting_discard_response(BUFFER *buffer, struct instance *instance) { } *d = '\0'; - info( + debug( + D_BACKEND, "EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'", buffer_strlen(buffer), instance->config.name, sample); +#else + UNUSED(instance); +#endif /* NETDATA_INTERNAL_CHECKS */ + buffer_flush(buffer); return 0; } @@ -47,7 +69,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance) struct stats *stats = &instance->stats; #ifdef ENABLE_HTTPS uint32_t options = (uint32_t)instance->config.options; - struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; + struct simple_connector_data *connector_specific_data = instance->connector_specific_data; if (options & EXPORTING_OPTION_USE_TLS) ERR_clear_error(); @@ -61,8 +83,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance) ssize_t r; #ifdef ENABLE_HTTPS - if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && - options & EXPORTING_OPTION_USE_TLS && + if (exporting_tls_is_enabled(instance->config.type, options) && connector_specific_data->conn && connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { r = (ssize_t)SSL_read(connector_specific_data->conn, @@ -132,11 +153,9 @@ endloop: * @param failures the number of communication failures. * @param instance an instance data structure. */ -void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance) +void simple_connector_send_buffer( + int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics) { - BUFFER *buffer = (BUFFER *)instance->buffer; - size_t len = buffer_strlen(buffer); - int flags = 0; #ifdef MSG_NOSIGNAL flags += MSG_NOSIGNAL; @@ -144,58 +163,61 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins #ifdef ENABLE_HTTPS uint32_t options = (uint32_t)instance->config.options; - struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; + struct simple_connector_data *connector_specific_data = instance->connector_specific_data; if (options & EXPORTING_OPTION_USE_TLS) ERR_clear_error(); #endif struct stats *stats = &instance->stats; + ssize_t header_sent_bytes = 0; + ssize_t buffer_sent_bytes = 0; + size_t header_len = buffer_strlen(header); + size_t buffer_len = buffer_strlen(buffer); - int ret = 0; - if (instance->send_header) - ret = instance->send_header(sock, instance); - - ssize_t written = -1; - - if (!ret) { #ifdef ENABLE_HTTPS - if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && - options & EXPORTING_OPTION_USE_TLS && - connector_specific_data->conn && - connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - written = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), len); - } else { - written = send(*sock, buffer_tostring(buffer), len, flags); - } -#else - written = send(*sock, buffer_tostring(buffer), len, flags); -#endif + if (exporting_tls_is_enabled(instance->config.type, options) && + connector_specific_data->conn && + connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + if (header_len) + header_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(header), header_len); + if ((size_t)header_sent_bytes == header_len) + buffer_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), buffer_len); + } else { + if (header_len) + header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags); + if ((size_t)header_sent_bytes == header_len) + buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags); } +#else + if (header_len) + header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags); + if ((size_t)header_sent_bytes == header_len) + buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags); +#endif - if(written != -1 && (size_t)written == len) { + if ((size_t)buffer_sent_bytes == buffer_len) { // we sent the data successfully stats->transmission_successes++; - stats->sent_bytes += written; - stats->sent_metrics = stats->buffered_metrics; + stats->sent_metrics += buffered_metrics; + stats->sent_bytes += buffer_sent_bytes; // reset the failures count *failures = 0; // empty the buffer buffer_flush(buffer); - } - else { + } else { // oops! we couldn't send (all or some of the) data error( "EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", instance->config.destination, - len, - written); + buffer_len, + buffer_sent_bytes); stats->transmission_failures++; - if(written != -1) - stats->sent_bytes += written; + if(buffer_sent_bytes != -1) + stats->sent_bytes += buffer_sent_bytes; // increment the counter we check for data loss (*failures)++; @@ -206,22 +228,6 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins } } -/** - * Clean up a simple connector instance on Netdata exit - * - * @param instance an instance data structure. - */ -void simple_connector_cleanup(struct instance *instance) -{ - info("EXPORTING: cleaning up instance %s ...", instance->config.name); - - buffer_free(instance->buffer); - freez(instance->config.connector_specific_config); - - info("EXPORTING: instance %s exited", instance->config.name); - instance->exited = 1; -} - /** * Simple connector worker * @@ -235,60 +241,97 @@ void simple_connector_worker(void *instance_p) #ifdef ENABLE_HTTPS uint32_t options = (uint32_t)instance->config.options; - struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; + struct simple_connector_data *connector_specific_data = instance->connector_specific_data; if (options & EXPORTING_OPTION_USE_TLS) ERR_clear_error(); #endif struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config; - struct stats *stats = &instance->stats; int sock = -1; - struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000, - .tv_usec = (instance->config.timeoutms * 1000) % 1000000}; + struct timeval timeout = { .tv_sec = (instance->config.timeoutms * 1000) / 1000000, + .tv_usec = (instance->config.timeoutms * 1000) % 1000000 }; int failures = 0; - while(!instance->engine->exit) { + BUFFER *spare_header = buffer_create(0); + BUFFER *spare_buffer = buffer_create(0); - // reset the monitoring chart counters - stats->received_bytes = - stats->sent_bytes = - stats->sent_metrics = - stats->lost_metrics = - stats->receptions = - stats->transmission_successes = - stats->transmission_failures = - stats->data_lost_events = - stats->lost_bytes = - stats->reconnects = 0; + while (!instance->engine->exit) { + struct stats *stats = &instance->stats; + int send_stats = 0; + + if (instance->data_is_ready) + send_stats = 1; + + uv_mutex_lock(&instance->mutex); + if (!connector_specific_data->first_buffer->used || failures) { + while (!instance->data_is_ready) + uv_cond_wait(&instance->cond_var, &instance->mutex); + instance->data_is_ready = 0; + send_stats = 1; + } + + if (unlikely(instance->engine->exit)) { + uv_mutex_unlock(&instance->mutex); + break; + } + + // ------------------------------------------------------------------------ + // detach buffer + + BUFFER *header; + BUFFER *buffer; + size_t buffered_metrics; + + if (!connector_specific_data->previous_buffer || + (connector_specific_data->previous_buffer == connector_specific_data->first_buffer && + connector_specific_data->first_buffer->used == 1)) { + connector_specific_data->header = connector_specific_data->first_buffer->header; + connector_specific_data->buffer = connector_specific_data->first_buffer->buffer; + connector_specific_data->buffered_metrics = connector_specific_data->first_buffer->buffered_metrics; + connector_specific_data->buffered_bytes = connector_specific_data->first_buffer->buffered_bytes; + + header = connector_specific_data->header; + buffer = connector_specific_data->buffer; + buffered_metrics = connector_specific_data->buffered_metrics; + + buffer_flush(spare_header); + connector_specific_data->first_buffer->header = spare_header; + spare_header = header; + + buffer_flush(spare_buffer); + connector_specific_data->first_buffer->buffer = spare_buffer; + spare_buffer = buffer; + } else { + header = connector_specific_data->header; + buffer = connector_specific_data->buffer; + buffered_metrics = connector_specific_data->buffered_metrics; + } + + uv_mutex_unlock(&instance->mutex); // ------------------------------------------------------------------------ // if we are connected, receive a response, without blocking - if(likely(sock != -1)) + if (likely(sock != -1)) simple_connector_receive_response(&sock, instance); // ------------------------------------------------------------------------ // if we are not connected, connect to a data collecting server - if(unlikely(sock == -1)) { + if (unlikely(sock == -1)) { size_t reconnects = 0; sock = connect_to_one_of( - instance->config.destination, - connector_specific_config->default_port, - &timeout, - &reconnects, - NULL, - 0); + instance->config.destination, connector_specific_config->default_port, &timeout, &reconnects, NULL, 0); #ifdef ENABLE_HTTPS - if(instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && sock != -1) { - if (netdata_opentsdb_ctx) { - if ( sock_delnonblock(sock) < 0 ) + if (exporting_tls_is_enabled(instance->config.type, options) && sock != -1) { + if (netdata_exporting_ctx) { + if (sock_delnonblock(sock) < 0) error("Exporting cannot remove the non-blocking flag from socket %d", sock); if (connector_specific_data->conn == NULL) { - connector_specific_data->conn = SSL_new(netdata_opentsdb_ctx); + connector_specific_data->conn = SSL_new(netdata_exporting_ctx); if (connector_specific_data->conn == NULL) { error("Failed to allocate SSL structure to socket %d.", sock); connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE; @@ -307,50 +350,42 @@ void simple_connector_worker(void *instance_p) int err = SSL_connect(connector_specific_data->conn); if (err != 1) { err = SSL_get_error(connector_specific_data->conn, err); - error("SSL cannot connect with the server: %s ", - ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL)); + error( + "SSL cannot connect with the server: %s ", + ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL)); connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE; } else { info("Exporting established a SSL connection."); struct timeval tv; - tv.tv_sec = timeout.tv_sec /4; + tv.tv_sec = timeout.tv_sec / 4; tv.tv_usec = 0; if (!tv.tv_sec) tv.tv_sec = 2; - if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv))) + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv))) error("Cannot set timeout to socket %d, this can block communication", sock); - } } } } - } #endif stats->reconnects += reconnects; } - if(unlikely(instance->engine->exit)) break; + if (unlikely(instance->engine->exit)) + break; // ------------------------------------------------------------------------ // if we are connected, send our buffer to the data collecting server - uv_mutex_lock(&instance->mutex); - while (!instance->data_is_ready) - uv_cond_wait(&instance->cond_var, &instance->mutex); - instance->data_is_ready = 0; - - if (unlikely(instance->engine->exit)) { - uv_mutex_unlock(&instance->mutex); - break; - } + failures = 0; if (likely(sock != -1)) { - simple_connector_send_buffer(&sock, &failures, instance); + simple_connector_send_buffer(&sock, &failures, instance, header, buffer, buffered_metrics); } else { error("EXPORTING: failed to update '%s'", instance->config.destination); stats->transmission_failures++; @@ -359,26 +394,40 @@ void simple_connector_worker(void *instance_p) failures++; } - BUFFER *buffer = instance->buffer; - - if (failures > instance->config.buffer_on_failures) { - stats->lost_bytes += buffer_strlen(buffer); - error( - "EXPORTING: connector instance %s reached %d exporting failures. " - "Flushing buffers to protect this host - this results in data loss on server '%s'", - instance->config.name, failures, instance->config.destination); - buffer_flush(buffer); - failures = 0; - stats->data_lost_events++; - stats->lost_metrics = stats->buffered_metrics; + if (!failures) { + connector_specific_data->first_buffer->buffered_metrics = + connector_specific_data->first_buffer->buffered_bytes = connector_specific_data->first_buffer->used = 0; + connector_specific_data->first_buffer = connector_specific_data->first_buffer->next; } - send_internal_metrics(instance); + if (unlikely(instance->engine->exit)) + break; + + if (send_stats) { + uv_mutex_lock(&instance->mutex); + + stats->buffered_metrics = connector_specific_data->total_buffered_metrics; + + send_internal_metrics(instance); - if(likely(buffer_strlen(buffer) == 0)) stats->buffered_metrics = 0; - uv_mutex_unlock(&instance->mutex); + // reset the internal monitoring chart counters + connector_specific_data->total_buffered_metrics = + stats->buffered_bytes = + stats->receptions = + stats->received_bytes = + stats->sent_metrics = + stats->sent_bytes = + stats->transmission_successes = + stats->transmission_failures = + stats->reconnects = + stats->data_lost_events = + stats->lost_metrics = + stats->lost_bytes = 0; + + uv_mutex_unlock(&instance->mutex); + } #ifdef UNIT_TESTING return; @@ -390,12 +439,5 @@ void simple_connector_worker(void *instance_p) clean_prometheus_remote_write(instance); #endif -#ifdef ENABLE_HTTPS - if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && options & EXPORTING_OPTION_USE_TLS) { - SSL_free(connector_specific_data->conn); - freez(instance->connector_specific_data); - } -#endif - simple_connector_cleanup(instance); } diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c index d5b0e3bff5..7560069c0a 100644 --- a/exporting/tests/exporting_doubles.c +++ b/exporting/tests/exporting_doubles.c @@ -168,6 +168,13 @@ int __mock_end_batch_formatting(struct instance *instance) return mock_type(int); } +int __wrap_simple_connector_end_batch(struct instance *instance) +{ + function_called(); + check_expected_ptr(instance); + return mock_type(int); +} + #if ENABLE_PROMETHEUS_REMOTE_WRITE void *__wrap_init_write_request() { diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c index eb429a02e3..befc2b882b 100644 --- a/exporting/tests/test_exporting_engine.c +++ b/exporting/tests/test_exporting_engine.c @@ -122,7 +122,6 @@ static void test_init_connectors(void **state) assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext); assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_host_formatting, flush_host_labels); - assert_ptr_equal(instance->end_batch_formatting, simple_connector_update_buffered_bytes); BUFFER *buffer = instance->buffer; assert_ptr_not_equal(buffer, NULL); @@ -500,14 +499,10 @@ static void test_format_dimension_collected_opentsdb_http(void **state) assert_int_equal(format_dimension_collected_opentsdb_http(engine->instance_root, rd), 0); assert_string_equal( buffer_tostring(engine->instance_root->buffer), - "POST /api/put HTTP/1.1\r\n" - "Host: test-host\r\n" - "Content-Type: application/json\r\n" - "Content-Length: 153\r\n\r\n" - "{ \"metric\": \"netdata.chart_name.dimension_name\", " - "\"timestamp\": 15051, " - "\"value\": 123000321, " - "\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}"); + "{\"metric\":\"netdata.chart_name.dimension_name\"," + "\"timestamp\":15051," + "\"value\":123000321," + "\"tags\":{\"host\":\"test-host TAG1=VALUE1 TAG2=VALUE2\"}}"); } static void test_format_dimension_stored_opentsdb_http(void **state) @@ -521,14 +516,10 @@ static void test_format_dimension_stored_opentsdb_http(void **state) assert_int_equal(format_dimension_stored_opentsdb_http(engine->instance_root, rd), 0); assert_string_equal( buffer_tostring(engine->instance_root->buffer), - "POST /api/put HTTP/1.1\r\n" - "Host: test-host\r\n" - "Content-Type: application/json\r\n" - "Content-Length: 161\r\n\r\n" - "{ \"metric\": \"netdata.chart_name.dimension_name\", " - "\"timestamp\": 15052, " - "\"value\": 690565856.0000000, " - "\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}"); + "{\"metric\":\"netdata.chart_name.dimension_name\"," + "\"timestamp\":15052," + "\"value\":690565856.0000000," + "\"tags\":{\"host\":\"test-host TAG1=VALUE1 TAG2=VALUE2\"}}"); } static void test_exporting_discard_response(void **state) @@ -538,14 +529,7 @@ static void test_exporting_discard_response(void **state) BUFFER *response = buffer_create(0); buffer_sprintf(response, "Test response"); - expect_function_call(__wrap_info_int); - assert_int_equal(exporting_discard_response(response, engine->instance_root), 0); - - assert_string_equal( - log_line, - "EXPORTING: received 13 bytes from instance_name connector instance. Ignoring them. Sample: 'Test response'"); - assert_int_equal(buffer_strlen(response), 0); buffer_free(response); @@ -565,14 +549,8 @@ static void test_simple_connector_receive_response(void **state) expect_value(__wrap_recv, len, 4096); expect_value(__wrap_recv, flags, MSG_DONTWAIT); - expect_function_call(__wrap_info_int); - simple_connector_receive_response(&sock, instance); - assert_string_equal( - log_line, - "EXPORTING: received 9 bytes from instance_name connector instance. Ignoring them. Sample: 'Test recv'"); - assert_int_equal(stats->received_bytes, 9); assert_int_equal(stats->receptions, 1); assert_int_equal(sock, 1); @@ -583,39 +561,34 @@ static void test_simple_connector_send_buffer(void **state) struct engine *engine = *state; struct instance *instance = engine->instance_root; struct stats *stats = &instance->stats; - BUFFER *buffer = instance->buffer; int sock = 1; int failures = 3; + size_t buffered_metrics = 1; + BUFFER *header = buffer_create(0); + BUFFER *buffer = buffer_create(0); + buffer_strcat(header, "test header\n"); + buffer_strcat(buffer, "test buffer\n"); - __real_mark_scheduled_instances(engine); - - expect_function_call(__wrap_rrdhost_is_exportable); - expect_value(__wrap_rrdhost_is_exportable, instance, instance); - expect_value(__wrap_rrdhost_is_exportable, host, localhost); - will_return(__wrap_rrdhost_is_exportable, 1); - - RRDSET *st = localhost->rrdset_root; - expect_function_call(__wrap_rrdset_is_exportable); - expect_value(__wrap_rrdset_is_exportable, instance, instance); - expect_value(__wrap_rrdset_is_exportable, st, st); - will_return(__wrap_rrdset_is_exportable, 1); - - __real_prepare_buffers(engine); + expect_function_call(__wrap_send); + expect_value(__wrap_send, sockfd, 1); + expect_value(__wrap_send, buf, buffer_tostring(header)); + expect_string(__wrap_send, buf, "test header\n"); + expect_value(__wrap_send, len, 12); + expect_value(__wrap_send, flags, MSG_NOSIGNAL); expect_function_call(__wrap_send); expect_value(__wrap_send, sockfd, 1); expect_value(__wrap_send, buf, buffer_tostring(buffer)); - expect_string( - __wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); - expect_value(__wrap_send, len, 84); + expect_string(__wrap_send, buf, "test buffer\n"); + expect_value(__wrap_send, len, 12); expect_value(__wrap_send, flags, MSG_NOSIGNAL); - simple_connector_send_buffer(&sock, &failures, instance); + simple_connector_send_buffer(&sock, &failures, instance, header, buffer, buffered_metrics); assert_int_equal(failures, 0); assert_int_equal(stats->transmission_successes, 1); - assert_int_equal(stats->sent_bytes, 84); + assert_int_equal(stats->sent_bytes, 12); assert_int_equal(stats->sent_metrics, 1); assert_int_equal(stats->transmission_failures, 0); @@ -629,22 +602,18 @@ static void test_simple_connector_worker(void **state) struct engine *engine = *state; struct instance *instance = engine->instance_root; struct stats *stats = &instance->stats; - BUFFER *buffer = instance->buffer; __real_mark_scheduled_instances(engine); - expect_function_call(__wrap_rrdhost_is_exportable); - expect_value(__wrap_rrdhost_is_exportable, instance, instance); - expect_value(__wrap_rrdhost_is_exportable, host, localhost); - will_return(__wrap_rrdhost_is_exportable, 1); + struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = simple_connector_data; + simple_connector_data->last_buffer = callocz(1, sizeof(struct simple_connector_buffer)); + simple_connector_data->first_buffer = simple_connector_data->last_buffer; + simple_connector_data->last_buffer->header = buffer_create(0); + simple_connector_data->last_buffer->buffer = buffer_create(0); - RRDSET *st = localhost->rrdset_root; - expect_function_call(__wrap_rrdset_is_exportable); - expect_value(__wrap_rrdset_is_exportable, instance, instance); - expect_value(__wrap_rrdset_is_exportable, st, st); - will_return(__wrap_rrdset_is_exportable, 1); - - __real_prepare_buffers(engine); + buffer_sprintf(simple_connector_data->last_buffer->header, "test header"); + buffer_sprintf(simple_connector_data->last_buffer->buffer, "test buffer"); expect_function_call(__wrap_connect_to_one_of); expect_string(__wrap_connect_to_one_of, destination, "localhost"); @@ -656,10 +625,16 @@ static void test_simple_connector_worker(void **state) expect_function_call(__wrap_send); expect_value(__wrap_send, sockfd, 2); - expect_value(__wrap_send, buf, buffer_tostring(buffer)); - expect_string( - __wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); - expect_value(__wrap_send, len, 84); + expect_not_value(__wrap_send, buf, buffer_tostring(simple_connector_data->last_buffer->buffer)); + expect_string(__wrap_send, buf, "test header"); + expect_value(__wrap_send, len, 11); + expect_value(__wrap_send, flags, MSG_NOSIGNAL); + + expect_function_call(__wrap_send); + expect_value(__wrap_send, sockfd, 2); + expect_value(__wrap_send, buf, buffer_tostring(simple_connector_data->last_buffer->buffer)); + expect_string(__wrap_send, buf, "test buffer"); + expect_value(__wrap_send, len, 11); expect_value(__wrap_send, flags, MSG_NOSIGNAL); expect_function_call(__wrap_send_internal_metrics); @@ -669,13 +644,13 @@ static void test_simple_connector_worker(void **state) simple_connector_worker(instance); assert_int_equal(stats->buffered_metrics, 0); - assert_int_equal(stats->buffered_bytes, 84); + assert_int_equal(stats->buffered_bytes, 0); assert_int_equal(stats->received_bytes, 0); - assert_int_equal(stats->sent_bytes, 84); - assert_int_equal(stats->sent_metrics, 1); + assert_int_equal(stats->sent_bytes, 0); + assert_int_equal(stats->sent_metrics, 0); assert_int_equal(stats->lost_metrics, 0); assert_int_equal(stats->receptions, 0); - assert_int_equal(stats->transmission_successes, 1); + assert_int_equal(stats->transmission_successes, 0); assert_int_equal(stats->transmission_failures, 0); assert_int_equal(stats->data_lost_events, 0); assert_int_equal(stats->lost_bytes, 0); @@ -1159,7 +1134,7 @@ static void test_init_prometheus_remote_write_instance(void **state) assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_host_formatting, NULL); assert_ptr_equal(instance->end_batch_formatting, format_batch_prometheus_remote_write); - assert_ptr_equal(instance->send_header, prometheus_remote_write_send_header); + assert_ptr_equal(instance->prepare_header, prometheus_remote_write_prepare_header); assert_ptr_equal(instance->check_response, process_prometheus_remote_write_response); assert_ptr_not_equal(instance->buffer, NULL); @@ -1169,40 +1144,43 @@ static void test_init_prometheus_remote_write_instance(void **state) (struct prometheus_remote_write_specific_data *)instance->connector_specific_data; assert_ptr_not_equal(instance->connector_specific_data, NULL); - assert_ptr_equal(connector_specific_data->write_request, 0xff); + assert_ptr_not_equal(connector_specific_data->write_request, NULL); freez(instance->connector_specific_data); } -static void test_prometheus_remote_write_send_header(void **state) +static void test_prometheus_remote_write_prepare_header(void **state) { struct engine *engine = *state; struct instance *instance = engine->instance_root; - int sock = 1; struct prometheus_remote_write_specific_config *connector_specific_config = callocz(1, sizeof(struct prometheus_remote_write_specific_config)); instance->config.connector_specific_config = connector_specific_config; connector_specific_config->remote_write_path = strdupz("/receive"); - buffer_sprintf(instance->buffer, "test buffer"); + struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data)); + instance->connector_specific_data = simple_connector_data; + simple_connector_data->last_buffer = callocz(1, sizeof(struct simple_connector_buffer)); + simple_connector_data->last_buffer->header = buffer_create(0); + simple_connector_data->last_buffer->buffer = buffer_create(0); - expect_function_call(__wrap_send); - expect_value(__wrap_send, sockfd, 1); - expect_not_value(__wrap_send, buf, NULL); - expect_string( - __wrap_send, buf, + buffer_sprintf(simple_connector_data->last_buffer->buffer, "test buffer"); + + prometheus_remote_write_prepare_header(instance); + + assert_string_equal( + buffer_tostring(simple_connector_data->last_buffer->header), "POST /receive HTTP/1.1\r\n" "Host: localhost\r\n" "Accept: */*\r\n" "X-Prometheus-Remote-Write-Version: 0.1.0\r\n" "Content-Length: 11\r\n" "Content-Type: application/x-www-form-urlencoded\r\n\r\n"); - expect_value(__wrap_send, len, 167); - expect_value(__wrap_send, flags, MSG_NOSIGNAL); - - assert_int_equal(prometheus_remote_write_send_header(&sock, instance),0); free(connector_specific_config->remote_write_path); + + buffer_free(simple_connector_data->last_buffer->header); + buffer_free(simple_connector_data->last_buffer->buffer); } static void test_process_prometheus_remote_write_response(void **state) @@ -1224,9 +1202,11 @@ static void test_format_host_prometheus_remote_write(void **state) instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; + struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *)); + instance->connector_specific_data = simple_connector_data; struct prometheus_remote_write_specific_data *connector_specific_data = mallocz(sizeof(struct prometheus_remote_write_specific_data *)); - instance->connector_specific_data = (void *)connector_specific_data; + simple_connector_data->connector_specific_data = (void *)connector_specific_data; connector_specific_data->write_request = (void *)0xff; localhost->program_name = strdupz("test_program"); @@ -1254,6 +1234,7 @@ static void test_format_host_prometheus_remote_write(void **state) assert_int_equal(format_host_prometheus_remote_write(instance, localhost), 0); freez(connector_specific_data); + freez(simple_connector_data); free(localhost->program_name); free(localhost->program_version); } @@ -1263,9 +1244,11 @@ static void test_format_dimension_prometheus_remote_write(void **state) struct engine *engine = *state; struct instance *instance = engine->instance_root; + struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *)); + instance->connector_specific_data = simple_connector_data; struct prometheus_remote_write_specific_data *connector_specific_data = mallocz(sizeof(struct prometheus_remote_write_specific_data *)); - instance->connector_specific_data = (void *)connector_specific_data; + simple_connector_data->connector_specific_data = (void *)connector_specific_data; connector_specific_data->write_request = (void *)0xff; RRDDIM *rd = localhost->rrdset_root->dimensions; @@ -1291,11 +1274,16 @@ static void test_format_batch_prometheus_remote_write(void **state) struct engine *engine = *state; struct instance *instance = engine->instance_root; + struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *)); + instance->connector_specific_data = simple_connector_data; struct prometheus_remote_write_specific_data *connector_specific_data = mallocz(sizeof(struct prometheus_remote_write_specific_data *)); - instance->connector_specific_data = (void *)connector_specific_data; + simple_connector_data->connector_specific_data = (void *)connector_specific_data; connector_specific_data->write_request = __real_init_write_request(); + expect_function_call(__wrap_simple_connector_end_batch); + expect_value(__wrap_simple_connector_end_batch, instance, instance); + will_return(__wrap_simple_connector_end_batch, 0); __real_add_host_info( connector_specific_data->write_request, "test_name", "test_instance", "test_application", "test_version", 15051); @@ -1410,6 +1398,9 @@ static void test_aws_kinesis_connector_worker(void **state) expect_value(__wrap_rrdset_is_exportable, st, st); will_return(__wrap_rrdset_is_exportable, 1); + expect_function_call(__wrap_simple_connector_end_batch); + expect_value(__wrap_simple_connector_end_batch, instance, instance); + will_return(__wrap_simple_connector_end_batch, 0); __real_prepare_buffers(engine); struct aws_kinesis_specific_config *connector_specific_config = @@ -1542,6 +1533,9 @@ static void test_pubsub_connector_worker(void **state) expect_value(__wrap_rrdset_is_exportable, st, st); will_return(__wrap_rrdset_is_exportable, 1); + expect_function_call(__wrap_simple_connector_end_batch); + expect_value(__wrap_simple_connector_end_batch, instance, instance); + will_return(__wrap_simple_connector_end_batch, 0); __real_prepare_buffers(engine); struct pubsub_specific_config *connector_specific_config = @@ -1663,7 +1657,7 @@ static void test_init_mongodb_instance(void **state) assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_host_formatting, flush_host_labels); assert_ptr_equal(instance->end_batch_formatting, format_batch_mongodb); - assert_ptr_equal(instance->send_header, NULL); + assert_ptr_equal(instance->prepare_header, NULL); assert_ptr_equal(instance->check_response, NULL); assert_ptr_not_equal(instance->buffer, NULL); @@ -1884,7 +1878,7 @@ int main(void) cmocka_unit_test_setup_teardown( test_init_prometheus_remote_write_instance, setup_configured_engine, teardown_configured_engine), cmocka_unit_test_setup_teardown( - test_prometheus_remote_write_send_header, setup_initialized_engine, teardown_initialized_engine), + test_prometheus_remote_write_prepare_header, setup_initialized_engine, teardown_initialized_engine), cmocka_unit_test(test_process_prometheus_remote_write_response), cmocka_unit_test_setup_teardown( test_format_host_prometheus_remote_write, setup_initialized_engine, teardown_initialized_engine), diff --git a/exporting/tests/test_exporting_engine.h b/exporting/tests/test_exporting_engine.h index 0d57c7a280..800be1b996 100644 --- a/exporting/tests/test_exporting_engine.h +++ b/exporting/tests/test_exporting_engine.h @@ -116,6 +116,8 @@ int __mock_end_chart_formatting(struct instance *instance, RRDSET *st); int __mock_end_host_formatting(struct instance *instance, RRDHOST *host); int __mock_end_batch_formatting(struct instance *instance); +int __wrap_simple_connector_end_batch(struct instance *instance); + #if ENABLE_PROMETHEUS_REMOTE_WRITE void *__real_init_write_request(); void *__wrap_init_write_request(); diff --git a/libnetdata/config/appconfig.c b/libnetdata/config/appconfig.c index 268c900522..70f9e4cda5 100644 --- a/libnetdata/config/appconfig.c +++ b/libnetdata/config/appconfig.c @@ -69,14 +69,20 @@ int is_valid_connector(char *type, int check_reserved) if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { return rc; + } else if (!strcmp(type, "graphite:http") || !strcmp(type, "graphite:https")) { + return rc; + } else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { + return rc; + } else if (!strcmp(type, "json:http") || !strcmp(type, "json:https")) { + return rc; } else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { return rc; } else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) { return rc; - } else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { - return rc; } else if (!strcmp(type, "prometheus_remote_write")) { return rc; + } else if (!strcmp(type, "prometheus_remote_write:http") || !strcmp(type, "prometheus_remote_write:https")) { + return rc; } else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { return rc; } else if (!strcmp(type, "pubsub") || !strcmp(type, "pubsub:plaintext")) { diff --git a/libnetdata/socket/security.c b/libnetdata/socket/security.c index 2f340ad996..53366c4d8f 100644 --- a/libnetdata/socket/security.c +++ b/libnetdata/socket/security.c @@ -2,7 +2,7 @@ #ifdef ENABLE_HTTPS -SSL_CTX *netdata_opentsdb_ctx=NULL; +SSL_CTX *netdata_exporting_ctx=NULL; SSL_CTX *netdata_client_ctx=NULL; SSL_CTX *netdata_srv_ctx=NULL; const char *security_key=NULL; @@ -201,7 +201,7 @@ static SSL_CTX * security_initialize_openssl_server() { * @param selector informs the context that must be initialized, the following list has the valid values: * NETDATA_SSL_CONTEXT_SERVER - the server context * NETDATA_SSL_CONTEXT_STREAMING - Starts the streaming context. - * NETDATA_SSL_CONTEXT_OPENTSDB - Starts the OpenTSDB contextv + * NETDATA_SSL_CONTEXT_EXPORTING - Starts the OpenTSDB contextv */ void security_start_ssl(int selector) { switch (selector) { @@ -222,8 +222,8 @@ void security_start_ssl(int selector) { SSL_CTX_set_mode(netdata_client_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |SSL_MODE_AUTO_RETRY); break; } - case NETDATA_SSL_CONTEXT_OPENTSDB: { - netdata_opentsdb_ctx = security_initialize_openssl_client(); + case NETDATA_SSL_CONTEXT_EXPORTING: { + netdata_exporting_ctx = security_initialize_openssl_client(); break; } } @@ -234,20 +234,18 @@ void security_start_ssl(int selector) { * * Clean all the allocated contexts from netdata. */ -void security_clean_openssl() { - if (netdata_srv_ctx) - { - SSL_CTX_free(netdata_srv_ctx); - } +void security_clean_openssl() +{ + if (netdata_srv_ctx) { + SSL_CTX_free(netdata_srv_ctx); + } - if (netdata_client_ctx) - { + if (netdata_client_ctx) { SSL_CTX_free(netdata_client_ctx); } - if ( netdata_opentsdb_ctx ) - { - SSL_CTX_free(netdata_opentsdb_ctx); + if (netdata_exporting_ctx) { + SSL_CTX_free(netdata_exporting_ctx); } #if OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 diff --git a/libnetdata/socket/security.h b/libnetdata/socket/security.h index 6eee80324f..17ecc6d05a 100644 --- a/libnetdata/socket/security.h +++ b/libnetdata/socket/security.h @@ -14,7 +14,7 @@ #define NETDATA_SSL_CONTEXT_SERVER 0 #define NETDATA_SSL_CONTEXT_STREAMING 1 -#define NETDATA_SSL_CONTEXT_OPENTSDB 2 +#define NETDATA_SSL_CONTEXT_EXPORTING 2 # ifdef ENABLE_HTTPS @@ -34,7 +34,7 @@ struct netdata_ssl{ uint32_t flags; //The flags for SSL connection }; -extern SSL_CTX *netdata_opentsdb_ctx; +extern SSL_CTX *netdata_exporting_ctx; extern SSL_CTX *netdata_client_ctx; extern SSL_CTX *netdata_srv_ctx; extern const char *security_key; diff --git a/web/api/exporters/allmetrics.c b/web/api/exporters/allmetrics.c index 6fcb4fb0cb..f3118ba5f3 100644 --- a/web/api/exporters/allmetrics.c +++ b/web/api/exporters/allmetrics.c @@ -4,24 +4,26 @@ struct prometheus_output_options { char *name; - BACKENDS_PROMETHEUS_OUTPUT_OPTIONS flag; + PROMETHEUS_OUTPUT_OPTIONS flag; } prometheus_output_flags_root[] = { - { "help", BACKENDS_PROMETHEUS_OUTPUT_HELP }, - { "types", BACKENDS_PROMETHEUS_OUTPUT_TYPES }, - { "names", BACKENDS_PROMETHEUS_OUTPUT_NAMES }, - { "timestamps", BACKENDS_PROMETHEUS_OUTPUT_TIMESTAMPS }, - { "variables", BACKENDS_PROMETHEUS_OUTPUT_VARIABLES }, - { "oldunits", BACKENDS_PROMETHEUS_OUTPUT_OLDUNITS }, - { "hideunits", BACKENDS_PROMETHEUS_OUTPUT_HIDEUNITS }, - // terminator - { NULL, BACKENDS_PROMETHEUS_OUTPUT_NONE }, + { "help", PROMETHEUS_OUTPUT_HELP }, + { "types", PROMETHEUS_OUTPUT_TYPES }, + { "names", PROMETHEUS_OUTPUT_NAMES }, + { "timestamps", PROMETHEUS_OUTPUT_TIMESTAMPS }, + { "variables", PROMETHEUS_OUTPUT_VARIABLES }, + { "oldunits", PROMETHEUS_OUTPUT_OLDUNITS }, + { "hideunits", PROMETHEUS_OUTPUT_HIDEUNITS }, + // terminator + { NULL, PROMETHEUS_OUTPUT_NONE }, }; inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client *w, char *url) { int format = ALLMETRICS_SHELL; const char *prometheus_server = w->client_ip; uint32_t prometheus_backend_options = global_backend_options; - BACKENDS_PROMETHEUS_OUTPUT_OPTIONS prometheus_output_options = BACKENDS_PROMETHEUS_OUTPUT_TIMESTAMPS | ((global_backend_options & BACKEND_OPTION_SEND_NAMES)?BACKENDS_PROMETHEUS_OUTPUT_NAMES:0); + PROMETHEUS_OUTPUT_OPTIONS prometheus_output_options = + PROMETHEUS_OUTPUT_TIMESTAMPS | + ((global_backend_options & BACKEND_OPTION_SEND_NAMES) ? PROMETHEUS_OUTPUT_NAMES : 0); const char *prometheus_prefix = global_backend_prefix; while(url) { @@ -84,7 +86,7 @@ inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client case ALLMETRICS_PROMETHEUS: w->response.data->contenttype = CT_PROMETHEUS; - backends_rrd_stats_api_v1_charts_allmetrics_prometheus_single_host( + rrd_stats_api_v1_charts_allmetrics_prometheus_single_host( host , w->response.data , prometheus_server @@ -96,7 +98,7 @@ inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client case ALLMETRICS_PROMETHEUS_ALL_HOSTS: w->response.data->contenttype = CT_PROMETHEUS; - backends_rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts( + rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts( host , w->response.data , prometheus_server