0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-05-04 01:10:03 +00:00

Add HTTP and HTTPS support to the simple exporting connector ()

This commit is contained in:
Vladimir Kobal 2020-11-05 19:08:17 +02:00 committed by GitHub
parent edd6d02dec
commit 943ee2482b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 847 additions and 399 deletions

2
.gitignore vendored
View file

@ -146,6 +146,8 @@ cmake-build-release/
CMakeCache.txt CMakeCache.txt
CMakeFiles/ CMakeFiles/
cmake_install.cmake cmake_install.cmake
.cmake
compile_commands.json
# jetbrains IDE # jetbrains IDE
.jetbrains* .jetbrains*

View file

@ -1270,6 +1270,7 @@ endif()
-Wl,--wrap=connect_to_one_of -Wl,--wrap=connect_to_one_of
-Wl,--wrap=create_main_rusage_chart -Wl,--wrap=create_main_rusage_chart
-Wl,--wrap=send_main_rusage -Wl,--wrap=send_main_rusage
-Wl,--wrap=simple_connector_end_batch
${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS} ${PROMETHEUS_REMOTE_WRITE_LINK_OPTIONS}
${KINESIS_LINK_OPTIONS} ${KINESIS_LINK_OPTIONS}
${PUBSUB_LINK_OPTIONS} ${PUBSUB_LINK_OPTIONS}

View file

@ -955,6 +955,7 @@ if ENABLE_UNITTESTS
-Wl,--wrap=connect_to_one_of \ -Wl,--wrap=connect_to_one_of \
-Wl,--wrap=create_main_rusage_chart \ -Wl,--wrap=create_main_rusage_chart \
-Wl,--wrap=send_main_rusage \ -Wl,--wrap=send_main_rusage \
-Wl,--wrap=simple_connector_end_batch \
$(TEST_LDFLAGS) \ $(TEST_LDFLAGS) \
$(NULL) $(NULL)
exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS) exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)

View file

@ -551,7 +551,7 @@ void *backends_main(void *ptr) {
case BACKEND_TYPE_OPENTSDB_USING_HTTP: { case BACKEND_TYPE_OPENTSDB_USING_HTTP: {
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if (!strcmp(type, "opentsdb:https")) { if (!strcmp(type, "opentsdb:https")) {
security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
} }
#endif #endif
backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter); backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter);
@ -1001,9 +1001,9 @@ void *backends_main(void *ptr) {
sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if(sock != -1) { if(sock != -1) {
if(netdata_opentsdb_ctx) { if(netdata_exporting_ctx) {
if(!opentsdb_ssl.conn) { if(!opentsdb_ssl.conn) {
opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx); opentsdb_ssl.conn = SSL_new(netdata_exporting_ctx);
if(!opentsdb_ssl.conn) { if(!opentsdb_ssl.conn) {
error("Failed to allocate SSL structure %d.", sock); error("Failed to allocate SSL structure %d.", sock);
opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE; opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
@ -1229,7 +1229,7 @@ cleanup:
buffer_free(response); buffer_free(response);
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if(netdata_opentsdb_ctx) { if(netdata_exporting_ctx) {
if(opentsdb_ssl.conn) { if(opentsdb_ssl.conn) {
SSL_free(opentsdb_ssl.conn); SSL_free(opentsdb_ssl.conn);
} }

View file

@ -44,7 +44,7 @@ X seconds (though, it can send them per second if you need it to).
also be configured). Learn more in our guide to [export and visualize Netdata metrics in also be configured). Learn more in our guide to [export and visualize Netdata metrics in
Graphite](/docs/guides/export/export-netdata-metrics-graphite.md). Graphite](/docs/guides/export/export-netdata-metrics-graphite.md).
- [**JSON** document databases](/exporting/json/README.md) - [**JSON** document databases](/exporting/json/README.md)
- [**OpenTSDB**](/exporting/opentsdb/README.md): Use a plaintext, HTTP, or HTTPS interfaces. Metrics are sent to - [**OpenTSDB**](/exporting/opentsdb/README.md): Use a plaintext or HTTP interfaces. Metrics are sent to
OpenTSDB as `prefix.chart.dimension` with tag `host=hostname`. OpenTSDB as `prefix.chart.dimension` with tag `host=hostname`.
- [**MongoDB**](/exporting/mongodb/README.md): Metrics are sent to the database in `JSON` format. - [**MongoDB**](/exporting/mongodb/README.md): Metrics are sent to the database in `JSON` format.
- [**Prometheus**](/exporting/prometheus/README.md): Use an existing Prometheus installation to scrape metrics - [**Prometheus**](/exporting/prometheus/README.md): Use an existing Prometheus installation to scrape metrics
@ -173,8 +173,10 @@ You can configure each connector individually using the available [options](#opt
- `[prometheus:exporter]` defines settings for Prometheus exporter API queries (e.g.: - `[prometheus:exporter]` defines settings for Prometheus exporter API queries (e.g.:
`http://NODE:19999/api/v1/allmetrics?format=prometheus&help=yes&source=as-collected`). `http://NODE:19999/api/v1/allmetrics?format=prometheus&help=yes&source=as-collected`).
- `[<type>:<name>]` keeps settings for a particular exporting connector instance, where: - `[<type>:<name>]` keeps settings for a particular exporting connector instance, where:
- `type` selects the exporting connector type: graphite | opentsdb:telnet | opentsdb:http | opentsdb:https | - `type` selects the exporting connector type: graphite | opentsdb:telnet | opentsdb:http |
prometheus_remote_write | json | kinesis | pubsub | mongodb prometheus_remote_write | json | kinesis | pubsub | mongodb. For graphite, opentsdb,
json, and prometheus_remote_write connectors you can also use `:http` or `:https` modifiers
(e.g.: `opentsdb:https`).
- `name` can be arbitrary instance name you chose. - `name` can be arbitrary instance name you chose.
### Options ### Options
@ -270,6 +272,13 @@ Configure individual connectors and override any global settings with the follow
> You can check how the host tags were parsed using the /api/v1/info API call. But, keep in mind that backends subsystem > You can check how the host tags were parsed using the /api/v1/info API call. But, keep in mind that backends subsystem
> is deprecated and will be deleted soon. Please move your existing tags to the `[host labels]` section. > is deprecated and will be deleted soon. Please move your existing tags to the `[host labels]` section.
## HTTPS
Netdata can send metrics to external databases using the TLS/SSL protocol. Unfortunately, some of
them does not support encrypted connections, so you will have to configure a reverse proxy to enable
HTTPS communication between Netdata and an external database. You can set up a reverse proxy with
[Nginx](/docs/Running-behind-nginx.md).
## Exporting engine monitoring ## Exporting engine monitoring
Netdata creates five charts in the dashboard, under the **Netdata Monitoring** section, to help you monitor the health Netdata creates five charts in the dashboard, under the **Netdata Monitoring** section, to help you monitor the health

View file

@ -48,7 +48,7 @@ int init_aws_kinesis_instance(struct instance *instance)
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = NULL; instance->end_batch_formatting = NULL;
instance->send_header = NULL; instance->prepare_header = NULL;
instance->check_response = NULL; instance->check_response = NULL;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);

View file

@ -35,3 +35,46 @@ void clean_instance(struct instance *instance)
uv_cond_destroy(&instance->cond_var); uv_cond_destroy(&instance->cond_var);
// uv_mutex_destroy(&instance->mutex); // uv_mutex_destroy(&instance->mutex);
} }
/**
* Clean up a simple connector instance on Netdata exit
*
* @param instance an instance data structure.
*/
void simple_connector_cleanup(struct instance *instance)
{
info("EXPORTING: cleaning up instance %s ...", instance->config.name);
struct simple_connector_data *simple_connector_data =
(struct simple_connector_data *)instance->connector_specific_data;
buffer_free(instance->buffer);
buffer_free(simple_connector_data->buffer);
buffer_free(simple_connector_data->header);
struct simple_connector_buffer *next_buffer = simple_connector_data->first_buffer;
for (int i = 0; i < instance->config.buffer_on_failures; i++) {
struct simple_connector_buffer *current_buffer = next_buffer;
next_buffer = next_buffer->next;
buffer_free(current_buffer->header);
buffer_free(current_buffer->buffer);
freez(current_buffer);
}
#ifdef ENABLE_HTTPS
if (simple_connector_data->conn)
SSL_free(simple_connector_data->conn);
#endif
freez(simple_connector_data);
struct simple_connector_config *simple_connector_config =
(struct simple_connector_config *)instance->config.connector_specific_config;
freez(simple_connector_config);
info("EXPORTING: instance %s exited", instance->config.name);
instance->exited = 1;
return;
}

View file

@ -46,10 +46,12 @@ typedef enum exporting_options {
typedef enum exporting_connector_types { typedef enum exporting_connector_types {
EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type
EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite
EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET, // Send data to OpenTSDB using telnet API EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP, // Send data to Graphite using HTTP API
EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP, // Send data to OpenTSDB using HTTP API EXPORTING_CONNECTOR_TYPE_JSON, // Send data in JSON format
EXPORTING_CONNECTOR_TYPE_JSON, // Stores the data using JSON. EXPORTING_CONNECTOR_TYPE_JSON_HTTP, // Send data in JSON format using HTTP API
EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // The user selected to use Prometheus backend EXPORTING_CONNECTOR_TYPE_OPENTSDB, // Send data to OpenTSDB using telnet API
EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP, // Send data to OpenTSDB using HTTP API
EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // User selected to use Prometheus backend
EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis
EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub
EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection
@ -81,6 +83,38 @@ struct simple_connector_config {
int default_port; int default_port;
}; };
struct simple_connector_buffer {
BUFFER *header;
BUFFER *buffer;
size_t buffered_metrics;
size_t buffered_bytes;
int used;
struct simple_connector_buffer *next;
};
struct simple_connector_data {
void *connector_specific_data;
size_t total_buffered_metrics;
BUFFER *header;
BUFFER *buffer;
size_t buffered_metrics;
size_t buffered_bytes;
struct simple_connector_buffer *previous_buffer;
struct simple_connector_buffer *first_buffer;
struct simple_connector_buffer *last_buffer;
#ifdef ENABLE_HTTPS
SSL *conn; //SSL connection
int flags; //The flags for SSL connection
#endif
};
struct prometheus_remote_write_specific_config { struct prometheus_remote_write_specific_config {
char *remote_write_path; char *remote_write_path;
}; };
@ -175,7 +209,7 @@ struct instance {
int (*end_host_formatting)(struct instance *instance, RRDHOST *host); int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
int (*end_batch_formatting)(struct instance *instance); int (*end_batch_formatting)(struct instance *instance);
int (*send_header)(int *sock, struct instance *instance); void (*prepare_header)(struct instance *instance);
int (*check_response)(BUFFER *buffer, struct instance *instance); int (*check_response)(BUFFER *buffer, struct instance *instance);
void *connector_specific_data; void *connector_specific_data;
@ -210,6 +244,7 @@ struct engine *read_exporting_config();
EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type); EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type);
int init_connectors(struct engine *engine); int init_connectors(struct engine *engine);
void simple_connector_init(struct instance *instance);
int mark_scheduled_instances(struct engine *engine); int mark_scheduled_instances(struct engine *engine);
void prepare_buffers(struct engine *engine); void prepare_buffers(struct engine *engine);
@ -232,11 +267,12 @@ void end_chart_formatting(struct engine *engine, RRDSET *st);
void end_host_formatting(struct engine *engine, RRDHOST *host); void end_host_formatting(struct engine *engine, RRDHOST *host);
void end_batch_formatting(struct engine *engine); void end_batch_formatting(struct engine *engine);
int flush_host_labels(struct instance *instance, RRDHOST *host); int flush_host_labels(struct instance *instance, RRDHOST *host);
int simple_connector_update_buffered_bytes(struct instance *instance); int simple_connector_end_batch(struct instance *instance);
int exporting_discard_response(BUFFER *buffer, struct instance *instance); int exporting_discard_response(BUFFER *buffer, struct instance *instance);
void simple_connector_receive_response(int *sock, struct instance *instance); void simple_connector_receive_response(int *sock, struct instance *instance);
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance); void simple_connector_send_buffer(
int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics);
void simple_connector_worker(void *instance_p); void simple_connector_worker(void *instance_p);
void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);
@ -244,6 +280,7 @@ void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system);
void send_internal_metrics(struct instance *instance); void send_internal_metrics(struct instance *instance);
extern void clean_instance(struct instance *ptr); extern void clean_instance(struct instance *ptr);
void simple_connector_cleanup(struct instance *instance);
static inline void disable_instance(struct instance *instance) static inline void disable_instance(struct instance *instance)
{ {

View file

@ -3,6 +3,7 @@ title: "Export metrics to Graphite providers"
sidebar_label: Graphite sidebar_label: Graphite
description: "Archive your Agent's metrics to a any Graphite database provider for long-term storage, further analysis, or correlation with data from other sources." description: "Archive your Agent's metrics to a any Graphite database provider for long-term storage, further analysis, or correlation with data from other sources."
custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/graphite/README.md custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/graphite/README.md
sidebar_label: Graphite
--> -->
# Export metrics to Graphite providers # Export metrics to Graphite providers
@ -21,6 +22,9 @@ directory and set the following options:
destination = localhost:2003 destination = localhost:2003
``` ```
Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `graphite:http:my_graphite_instance`,
`graphite:https:my_graphite_instance`.
The Graphite connector is further configurable using additional settings. See the [exporting reference The Graphite connector is further configurable using additional settings. See the [exporting reference
doc](/exporting/README.md#options) for details. doc](/exporting/README.md#options) for details.

View file

@ -16,6 +16,17 @@ int init_graphite_instance(struct instance *instance)
instance->config.connector_specific_config = (void *)connector_specific_config; instance->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 2003; connector_specific_config->default_port = 2003;
struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = connector_specific_data;
#ifdef ENABLE_HTTPS
connector_specific_data->flags = NETDATA_SSL_START;
connector_specific_data->conn = NULL;
if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
}
#endif
instance->start_batch_formatting = NULL; instance->start_batch_formatting = NULL;
instance->start_host_formatting = format_host_labels_graphite_plaintext; instance->start_host_formatting = format_host_labels_graphite_plaintext;
instance->start_chart_formatting = NULL; instance->start_chart_formatting = NULL;
@ -27,9 +38,13 @@ int init_graphite_instance(struct instance *instance)
instance->end_chart_formatting = NULL; instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = simple_connector_update_buffered_bytes; instance->end_batch_formatting = simple_connector_end_batch;
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP)
instance->prepare_header = graphite_http_prepare_header;
else
instance->prepare_header = NULL;
instance->send_header = NULL;
instance->check_response = exporting_discard_response; instance->check_response = exporting_discard_response;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
@ -37,6 +52,9 @@ int init_graphite_instance(struct instance *instance)
error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name); error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name);
return 1; return 1;
} }
simple_connector_init(instance);
if (uv_mutex_init(&instance->mutex)) if (uv_mutex_init(&instance->mutex))
return 1; return 1;
if (uv_cond_init(&instance->cond_var)) if (uv_cond_init(&instance->cond_var))
@ -187,3 +205,26 @@ int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM
return 0; return 0;
} }
/**
* Ppepare HTTP header
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
void graphite_http_prepare_header(struct instance *instance)
{
struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
buffer_sprintf(
simple_connector_data->last_buffer->header,
"POST /api/put HTTP/1.1\r\n"
"Host: %s\r\n"
"Content-Type: application/graphite\r\n"
"Content-Length: %lu\r\n"
"\r\n",
instance->config.destination,
buffer_strlen(simple_connector_data->last_buffer->buffer));
return;
}

View file

@ -13,4 +13,6 @@ int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *ho
int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd);
void graphite_http_prepare_header(struct instance *instance);
#endif //NETDATA_EXPORTING_GRAPHITE_H #endif //NETDATA_EXPORTING_GRAPHITE_H

View file

@ -40,15 +40,23 @@ int init_connectors(struct engine *engine)
if (init_graphite_instance(instance) != 0) if (init_graphite_instance(instance) != 0)
return 1; return 1;
break; break;
case EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP:
if (init_graphite_instance(instance) != 0)
return 1;
break;
case EXPORTING_CONNECTOR_TYPE_JSON: case EXPORTING_CONNECTOR_TYPE_JSON:
if (init_json_instance(instance) != 0) if (init_json_instance(instance) != 0)
return 1; return 1;
break; break;
case EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET: case EXPORTING_CONNECTOR_TYPE_JSON_HTTP:
if (init_json_http_instance(instance) != 0)
return 1;
break;
case EXPORTING_CONNECTOR_TYPE_OPENTSDB:
if (init_opentsdb_telnet_instance(instance) != 0) if (init_opentsdb_telnet_instance(instance) != 0)
return 1; return 1;
break; break;
case EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP: case EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP:
if (init_opentsdb_http_instance(instance) != 0) if (init_opentsdb_http_instance(instance) != 0)
return 1; return 1;
break; break;
@ -96,3 +104,36 @@ int init_connectors(struct engine *engine)
return 0; return 0;
} }
/**
* Initialize a ring buffer for a simple connector
*
* @param instance an instance data structure.
*/
void simple_connector_init(struct instance *instance)
{
struct simple_connector_data *connector_specific_data =
(struct simple_connector_data *)instance->connector_specific_data;
// create a ring buffer
struct simple_connector_buffer *first_buffer = NULL;
if (instance->config.buffer_on_failures < 1)
instance->config.buffer_on_failures = 1;
for (int i = 0; i < instance->config.buffer_on_failures; i++) {
struct simple_connector_buffer *current_buffer = callocz(1, sizeof(struct simple_connector_buffer));
if (!connector_specific_data->first_buffer)
first_buffer = current_buffer;
else
current_buffer->next = connector_specific_data->first_buffer;
connector_specific_data->first_buffer = current_buffer;
}
first_buffer->next = connector_specific_data->first_buffer;
connector_specific_data->last_buffer = connector_specific_data->first_buffer;
return;
}

View file

@ -3,6 +3,7 @@ title: "Export metrics to JSON document databases"
sidebar_label: JSON sidebar_label: JSON
description: "Archive your Agent's metrics to a JSON document database for long-term storage, further analysis, or correlation with data from other sources." description: "Archive your Agent's metrics to a JSON document database for long-term storage, further analysis, or correlation with data from other sources."
custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/json/README.md custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/json/README.md
sidebar_label: JSON Document Databases
--> -->
# Export metrics to JSON document databases # Export metrics to JSON document databases
@ -21,6 +22,9 @@ directory and set the following options:
destination = localhost:5448 destination = localhost:5448
``` ```
Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `json:http:my_json_instance`,
`json:https:my_json_instance`.
The JSON connector is further configurable using additional settings. See the [exporting reference The JSON connector is further configurable using additional settings. See the [exporting reference
doc](/exporting/README.md#options) for details. doc](/exporting/README.md#options) for details.

View file

@ -16,6 +16,9 @@ int init_json_instance(struct instance *instance)
instance->config.connector_specific_config = (void *)connector_specific_config; instance->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 5448; connector_specific_config->default_port = 5448;
struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = connector_specific_data;
instance->start_batch_formatting = NULL; instance->start_batch_formatting = NULL;
instance->start_host_formatting = format_host_labels_json_plaintext; instance->start_host_formatting = format_host_labels_json_plaintext;
instance->start_chart_formatting = NULL; instance->start_chart_formatting = NULL;
@ -27,9 +30,10 @@ int init_json_instance(struct instance *instance)
instance->end_chart_formatting = NULL; instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = simple_connector_update_buffered_bytes; instance->end_batch_formatting = simple_connector_end_batch;
instance->prepare_header = NULL;
instance->send_header = NULL;
instance->check_response = exporting_discard_response; instance->check_response = exporting_discard_response;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
@ -37,6 +41,63 @@ int init_json_instance(struct instance *instance)
error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name); error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name);
return 1; return 1;
} }
simple_connector_init(instance);
if (uv_mutex_init(&instance->mutex))
return 1;
if (uv_cond_init(&instance->cond_var))
return 1;
return 0;
}
/**
* Initialize JSON connector instance for HTTP protocol
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_json_http_instance(struct instance *instance)
{
instance->worker = simple_connector_worker;
struct simple_connector_config *connector_specific_config = callocz(1, sizeof(struct simple_connector_config));
instance->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 5448;
struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = connector_specific_data;
#ifdef ENABLE_HTTPS
connector_specific_data->flags = NETDATA_SSL_START;
connector_specific_data->conn = NULL;
if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
}
#endif
instance->start_batch_formatting = open_batch_json_http;
instance->start_host_formatting = format_host_labels_json_plaintext;
instance->start_chart_formatting = NULL;
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
instance->metric_formatting = format_dimension_collected_json_plaintext;
else
instance->metric_formatting = format_dimension_stored_json_plaintext;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = close_batch_json_http;
instance->prepare_header = json_http_prepare_header;
instance->check_response = exporting_discard_response;
instance->buffer = (void *)buffer_create(0);
simple_connector_init(instance);
if (uv_mutex_init(&instance->mutex)) if (uv_mutex_init(&instance->mutex))
return 1; return 1;
if (uv_cond_init(&instance->cond_var)) if (uv_cond_init(&instance->cond_var))
@ -111,6 +172,11 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM
} }
} }
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP) {
if (buffer_strlen((BUFFER *)instance->buffer) > 2)
buffer_strcat(instance->buffer, ",\n");
}
buffer_sprintf( buffer_sprintf(
instance->buffer, instance->buffer,
@ -131,7 +197,7 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM
"\"name\":\"%s\"," "\"name\":\"%s\","
"\"value\":" COLLECTED_NUMBER_FORMAT "," "\"value\":" COLLECTED_NUMBER_FORMAT ","
"\"timestamp\":%llu}\n", "\"timestamp\":%llu}",
instance->config.prefix, instance->config.prefix,
(host == localhost) ? engine->config.hostname : host->hostname, (host == localhost) ? engine->config.hostname : host->hostname,
@ -153,6 +219,10 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM
(unsigned long long)rd->last_collected_time.tv_sec); (unsigned long long)rd->last_collected_time.tv_sec);
if (instance->config.type != EXPORTING_CONNECTOR_TYPE_JSON_HTTP) {
buffer_strcat(instance->buffer, "\n");
}
return 0; return 0;
} }
@ -189,6 +259,11 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd
} }
} }
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP) {
if (buffer_strlen((BUFFER *)instance->buffer) > 2)
buffer_strcat(instance->buffer, ",\n");
}
buffer_sprintf( buffer_sprintf(
instance->buffer, instance->buffer,
"{" "{"
@ -208,7 +283,7 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd
"\"name\":\"%s\"," "\"name\":\"%s\","
"\"value\":" CALCULATED_NUMBER_FORMAT "," "\"value\":" CALCULATED_NUMBER_FORMAT ","
"\"timestamp\": %llu}\n", "\"timestamp\": %llu}",
instance->config.prefix, instance->config.prefix,
(host == localhost) ? engine->config.hostname : host->hostname, (host == localhost) ? engine->config.hostname : host->hostname,
@ -230,5 +305,60 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd
(unsigned long long)last_t); (unsigned long long)last_t);
if (instance->config.type != EXPORTING_CONNECTOR_TYPE_JSON_HTTP) {
buffer_strcat(instance->buffer, "\n");
}
return 0; return 0;
} }
/**
* Open a JSON list for a bach
*
* @param instance an instance data structure.
* @return Always returns 0.
*/
int open_batch_json_http(struct instance *instance)
{
buffer_strcat(instance->buffer, "[\n");
return 0;
}
/**
* Close a JSON list for a bach and update buffered bytes counter
*
* @param instance an instance data structure.
* @return Always returns 0.
*/
int close_batch_json_http(struct instance *instance)
{
buffer_strcat(instance->buffer, "\n]\n");
simple_connector_end_batch(instance);
return 0;
}
/**
* Prepare HTTP header
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
void json_http_prepare_header(struct instance *instance)
{
struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
buffer_sprintf(
simple_connector_data->last_buffer->header,
"POST /api/put HTTP/1.1\r\n"
"Host: %s\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %lu\r\n"
"\r\n",
instance->config.destination,
buffer_strlen(simple_connector_data->last_buffer->buffer));
return;
}

View file

@ -6,10 +6,16 @@
#include "exporting/exporting_engine.h" #include "exporting/exporting_engine.h"
int init_json_instance(struct instance *instance); int init_json_instance(struct instance *instance);
int init_json_http_instance(struct instance *instance);
int format_host_labels_json_plaintext(struct instance *instance, RRDHOST *host); int format_host_labels_json_plaintext(struct instance *instance, RRDHOST *host);
int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd); int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd);
int open_batch_json_http(struct instance *instance);
int close_batch_json_http(struct instance *instance);
void json_http_prepare_header(struct instance *instance);
#endif //NETDATA_EXPORTING_JSON_H #endif //NETDATA_EXPORTING_JSON_H

View file

@ -102,7 +102,7 @@ int init_mongodb_instance(struct instance *instance)
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = format_batch_mongodb; instance->end_batch_formatting = format_batch_mongodb;
instance->send_header = NULL; instance->prepare_header = NULL;
instance->check_response = NULL; instance->check_response = NULL;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
@ -284,9 +284,12 @@ void mongodb_connector_worker(void *instance_p)
struct stats *stats = &instance->stats; struct stats *stats = &instance->stats;
uv_mutex_lock(&instance->mutex); uv_mutex_lock(&instance->mutex);
while (!instance->data_is_ready) if (!connector_specific_data->first_buffer->insert ||
uv_cond_wait(&instance->cond_var, &instance->mutex); !connector_specific_data->first_buffer->documents_inserted) {
instance->data_is_ready = 0; while (!instance->data_is_ready)
uv_cond_wait(&instance->cond_var, &instance->mutex);
instance->data_is_ready = 0;
}
if (unlikely(instance->engine->exit)) { if (unlikely(instance->engine->exit)) {
uv_mutex_unlock(&instance->mutex); uv_mutex_unlock(&instance->mutex);

View file

@ -21,7 +21,6 @@ struct mongodb_specific_data {
size_t total_documents_inserted; size_t total_documents_inserted;
bson_t **current_insert;
struct bson_buffer *first_buffer; struct bson_buffer *first_buffer;
struct bson_buffer *last_buffer; struct bson_buffer *last_buffer;
}; };

View file

@ -1,40 +1,30 @@
<!-- <!--
title: "Export metrics to OpenTSDB with HTTP" title: "Export metrics to OpenTSDB"
description: "Archive your Agent's metrics to a OpenTSDB database for long-term storage and further analysis." description: "Archive your Agent's metrics to an OpenTSDB database for long-term storage and further analysis."
custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/opentsdb/README.md custom_edit_url: https://github.com/netdata/netdata/edit/master/exporting/opentsdb/README.md
sidebar_label: OpenTSDB with HTTP sidebar_label: OpenTSDB
--> -->
# Export metrics to OpenTSDB with HTTP # Export metrics to OpenTSDB
Netdata can easily communicate with OpenTSDB using HTTP API. To enable this channel, run `./edit-config exporting.conf` You can use the OpenTSDB connector for the [exporting engine](/exporting/README.md) to archive your agent's metrics to OpenTSDB
in the Netdata configuration directory and set the following options: databases for long-term storage, further analysis, or correlation with data from other sources.
## Configuration
To enable data exporting to an OpenTSDB database, run `./edit-config exporting.conf` in the Netdata configuration
directory and set the following options:
```conf ```conf
[opentsdb:http:my_instance] [opentsdb:my_opentsdb_instance]
enabled = yes enabled = yes
destination = localhost:4242 destination = localhost:4242
``` ```
In this example, OpenTSDB is running with its default port, which is `4242`. If you run OpenTSDB on a different port, Add `:http` or `:https` modifiers to the connector type if you need to use other than a plaintext protocol. For example: `opentsdb:http:my_opentsdb_instance`,
change the `destination = localhost:4242` line accordingly. `opentsdb:https:my_opentsdb_instance`.
## HTTPS The OpenTSDB connector is further configurable using additional settings. See the [exporting reference
doc](/exporting/README.md#options) for details.
As of [v1.16.0](https://github.com/netdata/netdata/releases/tag/v1.16.0), Netdata can send metrics to OpenTSDB using
TLS/SSL. Unfortunately, OpenTDSB does not support encrypted connections, so you will have to configure a reverse proxy
to enable HTTPS communication between Netdata and OpenTSBD. You can set up a reverse proxy with
[Nginx](/docs/Running-behind-nginx.md).
After your proxy is configured, make the following changes to `exporting.conf`:
```conf
[opentsdb:https:my_instance]
enabled = yes
destination = localhost:8082
```
In this example, we used the port `8082` for our reverse proxy. If your reverse proxy listens on a different port,
change the `destination = localhost:8082` line accordingly.
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Fopentsdb%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) [![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2Fopentsdb%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)

View file

@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later // SPDX-License-Identifier: GPL-3.0-or-later
#include "opentsdb.h" #include "opentsdb.h"
#include "../json/json.h"
/** /**
* Initialize OpenTSDB telnet connector instance * Initialize OpenTSDB telnet connector instance
@ -16,6 +17,17 @@ int init_opentsdb_telnet_instance(struct instance *instance)
instance->config.connector_specific_config = (void *)connector_specific_config; instance->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 4242; connector_specific_config->default_port = 4242;
struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = connector_specific_data;
#ifdef ENABLE_HTTPS
connector_specific_data->flags = NETDATA_SSL_START;
connector_specific_data->conn = NULL;
if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
}
#endif
instance->start_batch_formatting = NULL; instance->start_batch_formatting = NULL;
instance->start_host_formatting = format_host_labels_opentsdb_telnet; instance->start_host_formatting = format_host_labels_opentsdb_telnet;
instance->start_chart_formatting = NULL; instance->start_chart_formatting = NULL;
@ -27,9 +39,9 @@ int init_opentsdb_telnet_instance(struct instance *instance)
instance->end_chart_formatting = NULL; instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = simple_connector_update_buffered_bytes; instance->end_batch_formatting = simple_connector_end_batch;
instance->send_header = NULL; instance->prepare_header = NULL;
instance->check_response = exporting_discard_response; instance->check_response = exporting_discard_response;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
@ -37,6 +49,9 @@ int init_opentsdb_telnet_instance(struct instance *instance)
error("EXPORTING: cannot create buffer for opentsdb telnet exporting connector instance %s", instance->config.name); error("EXPORTING: cannot create buffer for opentsdb telnet exporting connector instance %s", instance->config.name);
return 1; return 1;
} }
simple_connector_init(instance);
if (uv_mutex_init(&instance->mutex)) if (uv_mutex_init(&instance->mutex))
return 1; return 1;
if (uv_cond_init(&instance->cond_var)) if (uv_cond_init(&instance->cond_var))
@ -59,17 +74,17 @@ int init_opentsdb_http_instance(struct instance *instance)
instance->config.connector_specific_config = (void *)connector_specific_config; instance->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 4242; connector_specific_config->default_port = 4242;
struct simple_connector_data *connector_specific_data = callocz(1, sizeof(struct simple_connector_data));
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
struct opentsdb_specific_data *connector_specific_data = callocz(1, sizeof(struct opentsdb_specific_data));
connector_specific_data->flags = NETDATA_SSL_START; connector_specific_data->flags = NETDATA_SSL_START;
connector_specific_data->conn = NULL; connector_specific_data->conn = NULL;
if (instance->config.options & EXPORTING_OPTION_USE_TLS) { if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB); security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
} }
instance->connector_specific_data = connector_specific_data;
#endif #endif
instance->connector_specific_data = connector_specific_data;
instance->start_batch_formatting = NULL; instance->start_batch_formatting = open_batch_json_http;
instance->start_host_formatting = format_host_labels_opentsdb_http; instance->start_host_formatting = format_host_labels_opentsdb_http;
instance->start_chart_formatting = NULL; instance->start_chart_formatting = NULL;
@ -80,9 +95,9 @@ int init_opentsdb_http_instance(struct instance *instance)
instance->end_chart_formatting = NULL; instance->end_chart_formatting = NULL;
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = simple_connector_update_buffered_bytes; instance->end_batch_formatting = close_batch_json_http;
instance->send_header = NULL; instance->prepare_header = opentsdb_http_prepare_header;
instance->check_response = exporting_discard_response; instance->check_response = exporting_discard_response;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
@ -90,6 +105,9 @@ int init_opentsdb_http_instance(struct instance *instance)
error("EXPORTING: cannot create buffer for opentsdb HTTP exporting connector instance %s", instance->config.name); error("EXPORTING: cannot create buffer for opentsdb HTTP exporting connector instance %s", instance->config.name);
return 1; return 1;
} }
simple_connector_init(instance);
if (uv_mutex_init(&instance->mutex)) if (uv_mutex_init(&instance->mutex))
return 1; return 1;
if (uv_cond_init(&instance->cond_var)) if (uv_cond_init(&instance->cond_var))
@ -240,26 +258,26 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r
} }
/** /**
* Prepare an HTTP message for OpenTSDB HTTP connector * Ppepare HTTP header
* *
* @param buffer a buffer to write the message to. * @param instance an instance data structure.
* @param message the body of the message. * @return Returns 0 on success, 1 on failure.
* @param hostname the name of the host that sends the message.
* @param length the length of the message body.
*/ */
static inline void opentsdb_build_message(BUFFER *buffer, char *message, const char *hostname, int length) void opentsdb_http_prepare_header(struct instance *instance)
{ {
struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
buffer_sprintf( buffer_sprintf(
buffer, simple_connector_data->last_buffer->header,
"POST /api/put HTTP/1.1\r\n" "POST /api/put HTTP/1.1\r\n"
"Host: %s\r\n" "Host: %s\r\n"
"Content-Type: application/json\r\n" "Content-Type: application/json\r\n"
"Content-Length: %d\r\n" "Content-Length: %lu\r\n"
"\r\n" "\r\n",
"%s", instance->config.destination,
hostname, buffer_strlen(simple_connector_data->last_buffer->buffer));
length,
message); return;
} }
/** /**
@ -324,17 +342,18 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id, (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX); RRD_ID_LENGTH_MAX);
char message[1024]; if (buffer_strlen((BUFFER *)instance->buffer) > 2)
int length = snprintfz( buffer_strcat(instance->buffer, ",\n");
message,
sizeof(message), buffer_sprintf(
instance->buffer,
"{" "{"
" \"metric\": \"%s.%s.%s\"," "\"metric\":\"%s.%s.%s\","
" \"timestamp\": %llu," "\"timestamp\":%llu,"
" \"value\": " COLLECTED_NUMBER_FORMAT "," "\"value\":"COLLECTED_NUMBER_FORMAT","
" \"tags\": {" "\"tags\":{"
" \"host\": \"%s%s%s\"%s" "\"host\":\"%s%s%s\"%s"
" }" "}"
"}", "}",
instance->config.prefix, instance->config.prefix,
chart_name, chart_name,
@ -346,10 +365,6 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *
(host->tags) ? host->tags : "", (host->tags) ? host->tags : "",
instance->labels ? buffer_tostring(instance->labels) : ""); instance->labels ? buffer_tostring(instance->labels) : "");
if (length > 0) {
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
}
return 0; return 0;
} }
@ -384,17 +399,18 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd)
if(isnan(value)) if(isnan(value))
return 0; return 0;
char message[1024]; if (buffer_strlen((BUFFER *)instance->buffer) > 2)
int length = snprintfz( buffer_strcat(instance->buffer, ",\n");
message,
sizeof(message), buffer_sprintf(
instance->buffer,
"{" "{"
" \"metric\": \"%s.%s.%s\"," "\"metric\":\"%s.%s.%s\","
" \"timestamp\": %llu," "\"timestamp\":%llu,"
" \"value\": " CALCULATED_NUMBER_FORMAT "," "\"value\":"CALCULATED_NUMBER_FORMAT","
" \"tags\": {" "\"tags\":{"
" \"host\": \"%s%s%s\"%s" "\"host\":\"%s%s%s\"%s"
" }" "}"
"}", "}",
instance->config.prefix, instance->config.prefix,
chart_name, chart_name,
@ -406,9 +422,5 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd)
(host->tags) ? host->tags : "", (host->tags) ? host->tags : "",
instance->labels ? buffer_tostring(instance->labels) : ""); instance->labels ? buffer_tostring(instance->labels) : "");
if (length > 0) {
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
}
return 0; return 0;
} }

View file

@ -18,11 +18,9 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r
int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *rd); int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd); int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd);
#ifdef ENABLE_HTTPS int open_batch_opentsdb_http(struct instance *instance);
struct opentsdb_specific_data { int close_batch_opentsdb_http(struct instance *instance);
SSL *conn; //SSL connection
int flags; //The flags for SSL connection void opentsdb_http_prepare_header(struct instance *instance);
};
#endif
#endif //NETDATA_EXPORTING_OPENTSDB_H #endif //NETDATA_EXPORTING_OPENTSDB_H

View file

@ -363,14 +363,64 @@ int flush_host_labels(struct instance *instance, RRDHOST *host)
} }
/** /**
* Update stats for buffered bytes * End a batch for a simple connector
* *
* @param instance an instance data structure. * @param instance an instance data structure.
* @return Always returns 0. * @return Returns 0 on success, 1 on failure.
*/ */
int simple_connector_update_buffered_bytes(struct instance *instance) int simple_connector_end_batch(struct instance *instance)
{ {
instance->stats.buffered_bytes = (collected_number)buffer_strlen((BUFFER *)(instance->buffer)); struct simple_connector_data *simple_connector_data =
(struct simple_connector_data *)instance->connector_specific_data;
struct stats *stats = &instance->stats;
BUFFER *instance_buffer = (BUFFER *)instance->buffer;
struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer;
if (!last_buffer->buffer) {
last_buffer->buffer = buffer_create(0);
}
if (last_buffer->used) {
// ring buffer is full, reuse the oldest element
simple_connector_data->first_buffer = simple_connector_data->first_buffer->next;
stats->data_lost_events++;
stats->lost_metrics += last_buffer->buffered_metrics;
stats->lost_bytes += last_buffer->buffered_bytes;
}
// swap buffers
BUFFER *tmp_buffer = last_buffer->buffer;
last_buffer->buffer = instance_buffer;
instance->buffer = instance_buffer = tmp_buffer;
buffer_flush(instance_buffer);
if (last_buffer->header)
buffer_flush(last_buffer->header);
else
last_buffer->header = buffer_create(0);
if (instance->prepare_header)
instance->prepare_header(instance);
// The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number
// of metrics, added in the current iteration, so we are clearing it here. We will use the
// simple_connector_data->total_buffered_metrics in the worker to show the statistics.
size_t buffered_metrics = (size_t)stats->buffered_metrics;
stats->buffered_metrics = 0;
size_t buffered_bytes = buffer_strlen(last_buffer->buffer);
last_buffer->buffered_metrics = buffered_metrics;
last_buffer->buffered_bytes = buffered_bytes;
last_buffer->used++;
simple_connector_data->total_buffered_metrics += buffered_metrics;
stats->buffered_bytes += buffered_bytes;
simple_connector_data->last_buffer = simple_connector_data->last_buffer->next;
return 0; return 0;
} }

View file

@ -30,6 +30,9 @@ in the Netdata configuration directory and set the following options:
remote write URL path = /receive remote write URL path = /receive
``` ```
You can also add `:https` modifier to the connector type if you need to use the TLS/SSL protocol. For example:
`remote_write:https:my_instance`.
`remote write URL path` is used to set an endpoint path for the remote write protocol. The default value is `/receive`. `remote write URL path` is used to set an endpoint path for the remote write protocol. The default value is `/receive`.
For example, if your endpoint is `http://example.domain:example_port/storage/read`: For example, if your endpoint is `http://example.domain:example_port/storage/read`:

View file

@ -10,28 +10,18 @@ char family[PROMETHEUS_ELEMENT_MAX + 1];
char units[PROMETHEUS_ELEMENT_MAX + 1] = ""; char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
/** /**
* Send header to a server * Prepare HTTP header
* *
* @param sock a communication socket.
* @param instance an instance data structure. * @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/ */
int prometheus_remote_write_send_header(int *sock, struct instance *instance) void prometheus_remote_write_prepare_header(struct instance *instance)
{ {
int flags = 0;
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
#endif
struct prometheus_remote_write_specific_config *connector_specific_config = struct prometheus_remote_write_specific_config *connector_specific_config =
instance->config.connector_specific_config; instance->config.connector_specific_config;
struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
static BUFFER *header;
if (!header)
header = buffer_create(0);
buffer_sprintf( buffer_sprintf(
header, simple_connector_data->last_buffer->header,
"POST %s HTTP/1.1\r\n" "POST %s HTTP/1.1\r\n"
"Host: %s\r\n" "Host: %s\r\n"
"Accept: */*\r\n" "Accept: */*\r\n"
@ -40,17 +30,7 @@ int prometheus_remote_write_send_header(int *sock, struct instance *instance)
"Content-Type: application/x-www-form-urlencoded\r\n\r\n", "Content-Type: application/x-www-form-urlencoded\r\n\r\n",
connector_specific_config->remote_write_path, connector_specific_config->remote_write_path,
instance->config.destination, instance->config.destination,
buffer_strlen((BUFFER *)instance->buffer)); buffer_strlen(simple_connector_data->last_buffer->buffer));
size_t header_len = buffer_strlen(header);
ssize_t written = send(*sock, buffer_tostring(header), header_len, flags);
buffer_flush(header);
if (written != -1 && (size_t)written == header_len)
return 0;
else
return 1;
} }
/** /**
@ -90,7 +70,8 @@ int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *in
*/ */
void clean_prometheus_remote_write(struct instance *instance) void clean_prometheus_remote_write(struct instance *instance)
{ {
freez(instance->connector_specific_data); struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
freez(simple_connector_data->connector_specific_data);
struct prometheus_remote_write_specific_config *connector_specific_config = struct prometheus_remote_write_specific_config *connector_specific_config =
instance->config.connector_specific_config; instance->config.connector_specific_config;
@ -115,22 +96,32 @@ int init_prometheus_remote_write_instance(struct instance *instance)
instance->end_host_formatting = NULL; instance->end_host_formatting = NULL;
instance->end_batch_formatting = format_batch_prometheus_remote_write; instance->end_batch_formatting = format_batch_prometheus_remote_write;
instance->send_header = prometheus_remote_write_send_header; instance->prepare_header = prometheus_remote_write_prepare_header;
instance->check_response = process_prometheus_remote_write_response; instance->check_response = process_prometheus_remote_write_response;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);
if (!instance->buffer) {
error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s", instance->config.name);
return 1;
}
if (uv_mutex_init(&instance->mutex)) if (uv_mutex_init(&instance->mutex))
return 1; return 1;
if (uv_cond_init(&instance->cond_var)) if (uv_cond_init(&instance->cond_var))
return 1; return 1;
struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = simple_connector_data;
#ifdef ENABLE_HTTPS
simple_connector_data->flags = NETDATA_SSL_START;
simple_connector_data->conn = NULL;
if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
}
#endif
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
callocz(1, sizeof(struct prometheus_remote_write_specific_data)); callocz(1, sizeof(struct prometheus_remote_write_specific_data));
instance->connector_specific_data = (void *)connector_specific_data; simple_connector_data->connector_specific_data = (void *)connector_specific_data;
simple_connector_init(instance);
connector_specific_data->write_request = init_write_request(); connector_specific_data->write_request = init_write_request();
@ -148,8 +139,10 @@ int init_prometheus_remote_write_instance(struct instance *instance)
*/ */
int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host) int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host)
{ {
struct simple_connector_data *simple_connector_data =
(struct simple_connector_data *)instance->connector_specific_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
(struct prometheus_remote_write_specific_data *)instance->connector_specific_data; (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
char hostname[PROMETHEUS_ELEMENT_MAX + 1]; char hostname[PROMETHEUS_ELEMENT_MAX + 1];
prometheus_label_copy( prometheus_label_copy(
@ -225,8 +218,10 @@ int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st)
*/ */
int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd) int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd)
{ {
struct simple_connector_data *simple_connector_data =
(struct simple_connector_data *)instance->connector_specific_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
(struct prometheus_remote_write_specific_data *)instance->connector_specific_data; (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) { if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
char name[PROMETHEUS_LABELS_MAX + 1]; char name[PROMETHEUS_LABELS_MAX + 1];
@ -322,8 +317,10 @@ int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *
*/ */
int format_batch_prometheus_remote_write(struct instance *instance) int format_batch_prometheus_remote_write(struct instance *instance)
{ {
struct simple_connector_data *simple_connector_data =
(struct simple_connector_data *)instance->connector_specific_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
(struct prometheus_remote_write_specific_data *)instance->connector_specific_data; (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
size_t data_size = get_write_request_size(connector_specific_data->write_request); size_t data_size = get_write_request_size(connector_specific_data->write_request);
@ -342,5 +339,7 @@ int format_batch_prometheus_remote_write(struct instance *instance)
buffer->len = data_size; buffer->len = data_size;
instance->stats.buffered_bytes = (collected_number)buffer_strlen(buffer); instance->stats.buffered_bytes = (collected_number)buffer_strlen(buffer);
simple_connector_end_batch(instance);
return 0; return 0;
} }

View file

@ -7,6 +7,10 @@
#include "exporting/prometheus/prometheus.h" #include "exporting/prometheus/prometheus.h"
#include "remote_write_request.h" #include "remote_write_request.h"
struct prometheus_remote_write_specific_data {
void *write_request;
};
int init_prometheus_remote_write_instance(struct instance *instance); int init_prometheus_remote_write_instance(struct instance *instance);
extern void clean_prometheus_remote_write(struct instance *instance); extern void clean_prometheus_remote_write(struct instance *instance);
@ -15,7 +19,7 @@ int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st);
int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd); int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd);
int format_batch_prometheus_remote_write(struct instance *instance); int format_batch_prometheus_remote_write(struct instance *instance);
int prometheus_remote_write_send_header(int *sock, struct instance *instance); void prometheus_remote_write_prepare_header(struct instance *instance);
int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance); int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance);
#endif //NETDATA_EXPORTING_PROMETHEUS_REMOTE_WRITE_H #endif //NETDATA_EXPORTING_PROMETHEUS_REMOTE_WRITE_H

View file

@ -7,10 +7,6 @@
extern "C" { extern "C" {
#endif #endif
struct prometheus_remote_write_specific_data {
void *write_request;
};
void *init_write_request(); void *init_write_request();
void add_host_info( void add_host_info(

View file

@ -26,7 +26,7 @@ int init_pubsub_instance(struct instance *instance)
instance->end_host_formatting = flush_host_labels; instance->end_host_formatting = flush_host_labels;
instance->end_batch_formatting = NULL; instance->end_batch_formatting = NULL;
instance->send_header = NULL; instance->prepare_header = NULL;
instance->check_response = NULL; instance->check_response = NULL;
instance->buffer = (void *)buffer_create(0); instance->buffer = (void *)buffer_create(0);

View file

@ -135,13 +135,20 @@ EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type)
{ {
if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
return EXPORTING_CONNECTOR_TYPE_GRAPHITE; return EXPORTING_CONNECTOR_TYPE_GRAPHITE;
} else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { } else if (!strcmp(type, "graphite:http") || !strcmp(type, "graphite:https")) {
return EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_TELNET; return EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP;
} else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
return EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP;
} else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { } else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
return EXPORTING_CONNECTOR_TYPE_JSON; return EXPORTING_CONNECTOR_TYPE_JSON;
} else if (!strcmp(type, "prometheus_remote_write")) { } else if (!strcmp(type, "json:http") || !strcmp(type, "json:https")) {
return EXPORTING_CONNECTOR_TYPE_JSON_HTTP;
} else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
return EXPORTING_CONNECTOR_TYPE_OPENTSDB;
} else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
return EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP;
} else if (
!strcmp(type, "prometheus_remote_write") ||
!strcmp(type, "prometheus_remote_write:http") ||
!strcmp(type, "prometheus_remote_write:https")) {
return EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE; return EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE;
} else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { } else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
return EXPORTING_CONNECTOR_TYPE_KINESIS; return EXPORTING_CONNECTOR_TYPE_KINESIS;
@ -432,7 +439,22 @@ struct engine *read_exporting_config()
tmp_instance->config.prefix = strdupz(exporter_get(instance_name, "prefix", "netdata")); tmp_instance->config.prefix = strdupz(exporter_get(instance_name, "prefix", "netdata"));
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if (tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && !strncmp(tmp_ci_list->local_ci.connector_name, "opentsdb:https", 14)) {
#define STR_GRAPHITE_HTTPS "graphite:https"
#define STR_JSON_HTTPS "json:https"
#define STR_OPENTSDB_HTTPS "opentsdb:https"
#define STR_PROMETHEUS_REMOTE_WRITE_HTTPS "prometheus_remote_write:https"
if ((tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP &&
!strncmp(tmp_ci_list->local_ci.connector_name, STR_GRAPHITE_HTTPS, strlen(STR_GRAPHITE_HTTPS))) ||
(tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP &&
!strncmp(tmp_ci_list->local_ci.connector_name, STR_JSON_HTTPS, strlen(STR_JSON_HTTPS))) ||
(tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP &&
!strncmp(tmp_ci_list->local_ci.connector_name, STR_OPENTSDB_HTTPS, strlen(STR_OPENTSDB_HTTPS))) ||
(tmp_instance->config.type == EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE &&
!strncmp(
tmp_ci_list->local_ci.connector_name, STR_PROMETHEUS_REMOTE_WRITE_HTTPS,
strlen(STR_PROMETHEUS_REMOTE_WRITE_HTTPS)))) {
tmp_instance->config.options |= EXPORTING_OPTION_USE_TLS; tmp_instance->config.options |= EXPORTING_OPTION_USE_TLS;
} }
#endif #endif

View file

@ -2,6 +2,22 @@
#include "exporting_engine.h" #include "exporting_engine.h"
/**
* Check if TLS is enabled in the configuration
*
* @param type buffer with response data.
* @param options an instance data structure.
* @return Returns 1 if TLS should be enabled, 0 otherwise.
*/
static int exporting_tls_is_enabled(EXPORTING_CONNECTOR_TYPE type, EXPORTING_OPTIONS options)
{
return (type == EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP ||
type == EXPORTING_CONNECTOR_TYPE_JSON_HTTP ||
type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP ||
type == EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE) &&
options & EXPORTING_OPTION_USE_TLS;
}
/** /**
* Discard response * Discard response
* *
@ -12,6 +28,7 @@
* @return Always returns 0. * @return Always returns 0.
*/ */
int exporting_discard_response(BUFFER *buffer, struct instance *instance) { int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
#if NETDATA_INTERNAL_CHECKS
char sample[1024]; char sample[1024];
const char *s = buffer_tostring(buffer); const char *s = buffer_tostring(buffer);
char *d = sample, *e = &sample[sizeof(sample) - 1]; char *d = sample, *e = &sample[sizeof(sample) - 1];
@ -23,11 +40,16 @@ int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
} }
*d = '\0'; *d = '\0';
info( debug(
D_BACKEND,
"EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'", "EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
buffer_strlen(buffer), buffer_strlen(buffer),
instance->config.name, instance->config.name,
sample); sample);
#else
UNUSED(instance);
#endif /* NETDATA_INTERNAL_CHECKS */
buffer_flush(buffer); buffer_flush(buffer);
return 0; return 0;
} }
@ -47,7 +69,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance)
struct stats *stats = &instance->stats; struct stats *stats = &instance->stats;
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options; uint32_t options = (uint32_t)instance->config.options;
struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS) if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error(); ERR_clear_error();
@ -61,8 +83,7 @@ void simple_connector_receive_response(int *sock, struct instance *instance)
ssize_t r; ssize_t r;
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && if (exporting_tls_is_enabled(instance->config.type, options) &&
options & EXPORTING_OPTION_USE_TLS &&
connector_specific_data->conn && connector_specific_data->conn &&
connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
r = (ssize_t)SSL_read(connector_specific_data->conn, r = (ssize_t)SSL_read(connector_specific_data->conn,
@ -132,11 +153,9 @@ endloop:
* @param failures the number of communication failures. * @param failures the number of communication failures.
* @param instance an instance data structure. * @param instance an instance data structure.
*/ */
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance) void simple_connector_send_buffer(
int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics)
{ {
BUFFER *buffer = (BUFFER *)instance->buffer;
size_t len = buffer_strlen(buffer);
int flags = 0; int flags = 0;
#ifdef MSG_NOSIGNAL #ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL; flags += MSG_NOSIGNAL;
@ -144,58 +163,61 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options; uint32_t options = (uint32_t)instance->config.options;
struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS) if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error(); ERR_clear_error();
#endif #endif
struct stats *stats = &instance->stats; struct stats *stats = &instance->stats;
ssize_t header_sent_bytes = 0;
ssize_t buffer_sent_bytes = 0;
size_t header_len = buffer_strlen(header);
size_t buffer_len = buffer_strlen(buffer);
int ret = 0;
if (instance->send_header)
ret = instance->send_header(sock, instance);
ssize_t written = -1;
if (!ret) {
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && if (exporting_tls_is_enabled(instance->config.type, options) &&
options & EXPORTING_OPTION_USE_TLS && connector_specific_data->conn &&
connector_specific_data->conn && connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { if (header_len)
written = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), len); header_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(header), header_len);
} else { if ((size_t)header_sent_bytes == header_len)
written = send(*sock, buffer_tostring(buffer), len, flags); buffer_sent_bytes = (ssize_t)SSL_write(connector_specific_data->conn, buffer_tostring(buffer), buffer_len);
} } else {
#else if (header_len)
written = send(*sock, buffer_tostring(buffer), len, flags); header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags);
#endif if ((size_t)header_sent_bytes == header_len)
buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags);
} }
#else
if (header_len)
header_sent_bytes = send(*sock, buffer_tostring(header), header_len, flags);
if ((size_t)header_sent_bytes == header_len)
buffer_sent_bytes = send(*sock, buffer_tostring(buffer), buffer_len, flags);
#endif
if(written != -1 && (size_t)written == len) { if ((size_t)buffer_sent_bytes == buffer_len) {
// we sent the data successfully // we sent the data successfully
stats->transmission_successes++; stats->transmission_successes++;
stats->sent_bytes += written; stats->sent_metrics += buffered_metrics;
stats->sent_metrics = stats->buffered_metrics; stats->sent_bytes += buffer_sent_bytes;
// reset the failures count // reset the failures count
*failures = 0; *failures = 0;
// empty the buffer // empty the buffer
buffer_flush(buffer); buffer_flush(buffer);
} } else {
else {
// oops! we couldn't send (all or some of the) data // oops! we couldn't send (all or some of the) data
error( error(
"EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", "EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
instance->config.destination, instance->config.destination,
len, buffer_len,
written); buffer_sent_bytes);
stats->transmission_failures++; stats->transmission_failures++;
if(written != -1) if(buffer_sent_bytes != -1)
stats->sent_bytes += written; stats->sent_bytes += buffer_sent_bytes;
// increment the counter we check for data loss // increment the counter we check for data loss
(*failures)++; (*failures)++;
@ -206,22 +228,6 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
} }
} }
/**
* Clean up a simple connector instance on Netdata exit
*
* @param instance an instance data structure.
*/
void simple_connector_cleanup(struct instance *instance)
{
info("EXPORTING: cleaning up instance %s ...", instance->config.name);
buffer_free(instance->buffer);
freez(instance->config.connector_specific_config);
info("EXPORTING: instance %s exited", instance->config.name);
instance->exited = 1;
}
/** /**
* Simple connector worker * Simple connector worker
* *
@ -235,60 +241,97 @@ void simple_connector_worker(void *instance_p)
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
uint32_t options = (uint32_t)instance->config.options; uint32_t options = (uint32_t)instance->config.options;
struct opentsdb_specific_data *connector_specific_data = instance->connector_specific_data; struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
if (options & EXPORTING_OPTION_USE_TLS) if (options & EXPORTING_OPTION_USE_TLS)
ERR_clear_error(); ERR_clear_error();
#endif #endif
struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config; struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config;
struct stats *stats = &instance->stats;
int sock = -1; int sock = -1;
struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000, struct timeval timeout = { .tv_sec = (instance->config.timeoutms * 1000) / 1000000,
.tv_usec = (instance->config.timeoutms * 1000) % 1000000}; .tv_usec = (instance->config.timeoutms * 1000) % 1000000 };
int failures = 0; int failures = 0;
while(!instance->engine->exit) { BUFFER *spare_header = buffer_create(0);
BUFFER *spare_buffer = buffer_create(0);
// reset the monitoring chart counters while (!instance->engine->exit) {
stats->received_bytes = struct stats *stats = &instance->stats;
stats->sent_bytes = int send_stats = 0;
stats->sent_metrics =
stats->lost_metrics = if (instance->data_is_ready)
stats->receptions = send_stats = 1;
stats->transmission_successes =
stats->transmission_failures = uv_mutex_lock(&instance->mutex);
stats->data_lost_events = if (!connector_specific_data->first_buffer->used || failures) {
stats->lost_bytes = while (!instance->data_is_ready)
stats->reconnects = 0; uv_cond_wait(&instance->cond_var, &instance->mutex);
instance->data_is_ready = 0;
send_stats = 1;
}
if (unlikely(instance->engine->exit)) {
uv_mutex_unlock(&instance->mutex);
break;
}
// ------------------------------------------------------------------------
// detach buffer
BUFFER *header;
BUFFER *buffer;
size_t buffered_metrics;
if (!connector_specific_data->previous_buffer ||
(connector_specific_data->previous_buffer == connector_specific_data->first_buffer &&
connector_specific_data->first_buffer->used == 1)) {
connector_specific_data->header = connector_specific_data->first_buffer->header;
connector_specific_data->buffer = connector_specific_data->first_buffer->buffer;
connector_specific_data->buffered_metrics = connector_specific_data->first_buffer->buffered_metrics;
connector_specific_data->buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
header = connector_specific_data->header;
buffer = connector_specific_data->buffer;
buffered_metrics = connector_specific_data->buffered_metrics;
buffer_flush(spare_header);
connector_specific_data->first_buffer->header = spare_header;
spare_header = header;
buffer_flush(spare_buffer);
connector_specific_data->first_buffer->buffer = spare_buffer;
spare_buffer = buffer;
} else {
header = connector_specific_data->header;
buffer = connector_specific_data->buffer;
buffered_metrics = connector_specific_data->buffered_metrics;
}
uv_mutex_unlock(&instance->mutex);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking // if we are connected, receive a response, without blocking
if(likely(sock != -1)) if (likely(sock != -1))
simple_connector_receive_response(&sock, instance); simple_connector_receive_response(&sock, instance);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// if we are not connected, connect to a data collecting server // if we are not connected, connect to a data collecting server
if(unlikely(sock == -1)) { if (unlikely(sock == -1)) {
size_t reconnects = 0; size_t reconnects = 0;
sock = connect_to_one_of( sock = connect_to_one_of(
instance->config.destination, instance->config.destination, connector_specific_config->default_port, &timeout, &reconnects, NULL, 0);
connector_specific_config->default_port,
&timeout,
&reconnects,
NULL,
0);
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
if(instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && sock != -1) { if (exporting_tls_is_enabled(instance->config.type, options) && sock != -1) {
if (netdata_opentsdb_ctx) { if (netdata_exporting_ctx) {
if ( sock_delnonblock(sock) < 0 ) if (sock_delnonblock(sock) < 0)
error("Exporting cannot remove the non-blocking flag from socket %d", sock); error("Exporting cannot remove the non-blocking flag from socket %d", sock);
if (connector_specific_data->conn == NULL) { if (connector_specific_data->conn == NULL) {
connector_specific_data->conn = SSL_new(netdata_opentsdb_ctx); connector_specific_data->conn = SSL_new(netdata_exporting_ctx);
if (connector_specific_data->conn == NULL) { if (connector_specific_data->conn == NULL) {
error("Failed to allocate SSL structure to socket %d.", sock); error("Failed to allocate SSL structure to socket %d.", sock);
connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE; connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE;
@ -307,50 +350,42 @@ void simple_connector_worker(void *instance_p)
int err = SSL_connect(connector_specific_data->conn); int err = SSL_connect(connector_specific_data->conn);
if (err != 1) { if (err != 1) {
err = SSL_get_error(connector_specific_data->conn, err); err = SSL_get_error(connector_specific_data->conn, err);
error("SSL cannot connect with the server: %s ", error(
ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL)); "SSL cannot connect with the server: %s ",
ERR_error_string((long)SSL_get_error(connector_specific_data->conn, err), NULL));
connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE; connector_specific_data->flags = NETDATA_SSL_NO_HANDSHAKE;
} else { } else {
info("Exporting established a SSL connection."); info("Exporting established a SSL connection.");
struct timeval tv; struct timeval tv;
tv.tv_sec = timeout.tv_sec /4; tv.tv_sec = timeout.tv_sec / 4;
tv.tv_usec = 0; tv.tv_usec = 0;
if (!tv.tv_sec) if (!tv.tv_sec)
tv.tv_sec = 2; tv.tv_sec = 2;
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv))) if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)))
error("Cannot set timeout to socket %d, this can block communication", sock); error("Cannot set timeout to socket %d, this can block communication", sock);
} }
} }
} }
} }
} }
#endif #endif
stats->reconnects += reconnects; stats->reconnects += reconnects;
} }
if(unlikely(instance->engine->exit)) break; if (unlikely(instance->engine->exit))
break;
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// if we are connected, send our buffer to the data collecting server // if we are connected, send our buffer to the data collecting server
uv_mutex_lock(&instance->mutex); failures = 0;
while (!instance->data_is_ready)
uv_cond_wait(&instance->cond_var, &instance->mutex);
instance->data_is_ready = 0;
if (unlikely(instance->engine->exit)) {
uv_mutex_unlock(&instance->mutex);
break;
}
if (likely(sock != -1)) { if (likely(sock != -1)) {
simple_connector_send_buffer(&sock, &failures, instance); simple_connector_send_buffer(&sock, &failures, instance, header, buffer, buffered_metrics);
} else { } else {
error("EXPORTING: failed to update '%s'", instance->config.destination); error("EXPORTING: failed to update '%s'", instance->config.destination);
stats->transmission_failures++; stats->transmission_failures++;
@ -359,26 +394,40 @@ void simple_connector_worker(void *instance_p)
failures++; failures++;
} }
BUFFER *buffer = instance->buffer; if (!failures) {
connector_specific_data->first_buffer->buffered_metrics =
if (failures > instance->config.buffer_on_failures) { connector_specific_data->first_buffer->buffered_bytes = connector_specific_data->first_buffer->used = 0;
stats->lost_bytes += buffer_strlen(buffer); connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
error(
"EXPORTING: connector instance %s reached %d exporting failures. "
"Flushing buffers to protect this host - this results in data loss on server '%s'",
instance->config.name, failures, instance->config.destination);
buffer_flush(buffer);
failures = 0;
stats->data_lost_events++;
stats->lost_metrics = stats->buffered_metrics;
} }
send_internal_metrics(instance); if (unlikely(instance->engine->exit))
break;
if (send_stats) {
uv_mutex_lock(&instance->mutex);
stats->buffered_metrics = connector_specific_data->total_buffered_metrics;
send_internal_metrics(instance);
if(likely(buffer_strlen(buffer) == 0))
stats->buffered_metrics = 0; stats->buffered_metrics = 0;
uv_mutex_unlock(&instance->mutex); // reset the internal monitoring chart counters
connector_specific_data->total_buffered_metrics =
stats->buffered_bytes =
stats->receptions =
stats->received_bytes =
stats->sent_metrics =
stats->sent_bytes =
stats->transmission_successes =
stats->transmission_failures =
stats->reconnects =
stats->data_lost_events =
stats->lost_metrics =
stats->lost_bytes = 0;
uv_mutex_unlock(&instance->mutex);
}
#ifdef UNIT_TESTING #ifdef UNIT_TESTING
return; return;
@ -390,12 +439,5 @@ void simple_connector_worker(void *instance_p)
clean_prometheus_remote_write(instance); clean_prometheus_remote_write(instance);
#endif #endif
#ifdef ENABLE_HTTPS
if (instance->config.type == EXPORTING_CONNECTOR_TYPE_OPENTSDB_USING_HTTP && options & EXPORTING_OPTION_USE_TLS) {
SSL_free(connector_specific_data->conn);
freez(instance->connector_specific_data);
}
#endif
simple_connector_cleanup(instance); simple_connector_cleanup(instance);
} }

View file

@ -168,6 +168,13 @@ int __mock_end_batch_formatting(struct instance *instance)
return mock_type(int); return mock_type(int);
} }
int __wrap_simple_connector_end_batch(struct instance *instance)
{
function_called();
check_expected_ptr(instance);
return mock_type(int);
}
#if ENABLE_PROMETHEUS_REMOTE_WRITE #if ENABLE_PROMETHEUS_REMOTE_WRITE
void *__wrap_init_write_request() void *__wrap_init_write_request()
{ {

View file

@ -122,7 +122,6 @@ static void test_init_connectors(void **state)
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext); assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext);
assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_chart_formatting, NULL);
assert_ptr_equal(instance->end_host_formatting, flush_host_labels); assert_ptr_equal(instance->end_host_formatting, flush_host_labels);
assert_ptr_equal(instance->end_batch_formatting, simple_connector_update_buffered_bytes);
BUFFER *buffer = instance->buffer; BUFFER *buffer = instance->buffer;
assert_ptr_not_equal(buffer, NULL); assert_ptr_not_equal(buffer, NULL);
@ -500,14 +499,10 @@ static void test_format_dimension_collected_opentsdb_http(void **state)
assert_int_equal(format_dimension_collected_opentsdb_http(engine->instance_root, rd), 0); assert_int_equal(format_dimension_collected_opentsdb_http(engine->instance_root, rd), 0);
assert_string_equal( assert_string_equal(
buffer_tostring(engine->instance_root->buffer), buffer_tostring(engine->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n" "{\"metric\":\"netdata.chart_name.dimension_name\","
"Host: test-host\r\n" "\"timestamp\":15051,"
"Content-Type: application/json\r\n" "\"value\":123000321,"
"Content-Length: 153\r\n\r\n" "\"tags\":{\"host\":\"test-host TAG1=VALUE1 TAG2=VALUE2\"}}");
"{ \"metric\": \"netdata.chart_name.dimension_name\", "
"\"timestamp\": 15051, "
"\"value\": 123000321, "
"\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}");
} }
static void test_format_dimension_stored_opentsdb_http(void **state) static void test_format_dimension_stored_opentsdb_http(void **state)
@ -521,14 +516,10 @@ static void test_format_dimension_stored_opentsdb_http(void **state)
assert_int_equal(format_dimension_stored_opentsdb_http(engine->instance_root, rd), 0); assert_int_equal(format_dimension_stored_opentsdb_http(engine->instance_root, rd), 0);
assert_string_equal( assert_string_equal(
buffer_tostring(engine->instance_root->buffer), buffer_tostring(engine->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n" "{\"metric\":\"netdata.chart_name.dimension_name\","
"Host: test-host\r\n" "\"timestamp\":15052,"
"Content-Type: application/json\r\n" "\"value\":690565856.0000000,"
"Content-Length: 161\r\n\r\n" "\"tags\":{\"host\":\"test-host TAG1=VALUE1 TAG2=VALUE2\"}}");
"{ \"metric\": \"netdata.chart_name.dimension_name\", "
"\"timestamp\": 15052, "
"\"value\": 690565856.0000000, "
"\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}");
} }
static void test_exporting_discard_response(void **state) static void test_exporting_discard_response(void **state)
@ -538,14 +529,7 @@ static void test_exporting_discard_response(void **state)
BUFFER *response = buffer_create(0); BUFFER *response = buffer_create(0);
buffer_sprintf(response, "Test response"); buffer_sprintf(response, "Test response");
expect_function_call(__wrap_info_int);
assert_int_equal(exporting_discard_response(response, engine->instance_root), 0); assert_int_equal(exporting_discard_response(response, engine->instance_root), 0);
assert_string_equal(
log_line,
"EXPORTING: received 13 bytes from instance_name connector instance. Ignoring them. Sample: 'Test response'");
assert_int_equal(buffer_strlen(response), 0); assert_int_equal(buffer_strlen(response), 0);
buffer_free(response); buffer_free(response);
@ -565,14 +549,8 @@ static void test_simple_connector_receive_response(void **state)
expect_value(__wrap_recv, len, 4096); expect_value(__wrap_recv, len, 4096);
expect_value(__wrap_recv, flags, MSG_DONTWAIT); expect_value(__wrap_recv, flags, MSG_DONTWAIT);
expect_function_call(__wrap_info_int);
simple_connector_receive_response(&sock, instance); simple_connector_receive_response(&sock, instance);
assert_string_equal(
log_line,
"EXPORTING: received 9 bytes from instance_name connector instance. Ignoring them. Sample: 'Test recv'");
assert_int_equal(stats->received_bytes, 9); assert_int_equal(stats->received_bytes, 9);
assert_int_equal(stats->receptions, 1); assert_int_equal(stats->receptions, 1);
assert_int_equal(sock, 1); assert_int_equal(sock, 1);
@ -583,39 +561,34 @@ static void test_simple_connector_send_buffer(void **state)
struct engine *engine = *state; struct engine *engine = *state;
struct instance *instance = engine->instance_root; struct instance *instance = engine->instance_root;
struct stats *stats = &instance->stats; struct stats *stats = &instance->stats;
BUFFER *buffer = instance->buffer;
int sock = 1; int sock = 1;
int failures = 3; int failures = 3;
size_t buffered_metrics = 1;
BUFFER *header = buffer_create(0);
BUFFER *buffer = buffer_create(0);
buffer_strcat(header, "test header\n");
buffer_strcat(buffer, "test buffer\n");
__real_mark_scheduled_instances(engine); expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 1);
expect_function_call(__wrap_rrdhost_is_exportable); expect_value(__wrap_send, buf, buffer_tostring(header));
expect_value(__wrap_rrdhost_is_exportable, instance, instance); expect_string(__wrap_send, buf, "test header\n");
expect_value(__wrap_rrdhost_is_exportable, host, localhost); expect_value(__wrap_send, len, 12);
will_return(__wrap_rrdhost_is_exportable, 1); expect_value(__wrap_send, flags, MSG_NOSIGNAL);
RRDSET *st = localhost->rrdset_root;
expect_function_call(__wrap_rrdset_is_exportable);
expect_value(__wrap_rrdset_is_exportable, instance, instance);
expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1);
__real_prepare_buffers(engine);
expect_function_call(__wrap_send); expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 1); expect_value(__wrap_send, sockfd, 1);
expect_value(__wrap_send, buf, buffer_tostring(buffer)); expect_value(__wrap_send, buf, buffer_tostring(buffer));
expect_string( expect_string(__wrap_send, buf, "test buffer\n");
__wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); expect_value(__wrap_send, len, 12);
expect_value(__wrap_send, len, 84);
expect_value(__wrap_send, flags, MSG_NOSIGNAL); expect_value(__wrap_send, flags, MSG_NOSIGNAL);
simple_connector_send_buffer(&sock, &failures, instance); simple_connector_send_buffer(&sock, &failures, instance, header, buffer, buffered_metrics);
assert_int_equal(failures, 0); assert_int_equal(failures, 0);
assert_int_equal(stats->transmission_successes, 1); assert_int_equal(stats->transmission_successes, 1);
assert_int_equal(stats->sent_bytes, 84); assert_int_equal(stats->sent_bytes, 12);
assert_int_equal(stats->sent_metrics, 1); assert_int_equal(stats->sent_metrics, 1);
assert_int_equal(stats->transmission_failures, 0); assert_int_equal(stats->transmission_failures, 0);
@ -629,22 +602,18 @@ static void test_simple_connector_worker(void **state)
struct engine *engine = *state; struct engine *engine = *state;
struct instance *instance = engine->instance_root; struct instance *instance = engine->instance_root;
struct stats *stats = &instance->stats; struct stats *stats = &instance->stats;
BUFFER *buffer = instance->buffer;
__real_mark_scheduled_instances(engine); __real_mark_scheduled_instances(engine);
expect_function_call(__wrap_rrdhost_is_exportable); struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
expect_value(__wrap_rrdhost_is_exportable, instance, instance); instance->connector_specific_data = simple_connector_data;
expect_value(__wrap_rrdhost_is_exportable, host, localhost); simple_connector_data->last_buffer = callocz(1, sizeof(struct simple_connector_buffer));
will_return(__wrap_rrdhost_is_exportable, 1); simple_connector_data->first_buffer = simple_connector_data->last_buffer;
simple_connector_data->last_buffer->header = buffer_create(0);
simple_connector_data->last_buffer->buffer = buffer_create(0);
RRDSET *st = localhost->rrdset_root; buffer_sprintf(simple_connector_data->last_buffer->header, "test header");
expect_function_call(__wrap_rrdset_is_exportable); buffer_sprintf(simple_connector_data->last_buffer->buffer, "test buffer");
expect_value(__wrap_rrdset_is_exportable, instance, instance);
expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1);
__real_prepare_buffers(engine);
expect_function_call(__wrap_connect_to_one_of); expect_function_call(__wrap_connect_to_one_of);
expect_string(__wrap_connect_to_one_of, destination, "localhost"); expect_string(__wrap_connect_to_one_of, destination, "localhost");
@ -656,10 +625,16 @@ static void test_simple_connector_worker(void **state)
expect_function_call(__wrap_send); expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 2); expect_value(__wrap_send, sockfd, 2);
expect_value(__wrap_send, buf, buffer_tostring(buffer)); expect_not_value(__wrap_send, buf, buffer_tostring(simple_connector_data->last_buffer->buffer));
expect_string( expect_string(__wrap_send, buf, "test header");
__wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); expect_value(__wrap_send, len, 11);
expect_value(__wrap_send, len, 84); expect_value(__wrap_send, flags, MSG_NOSIGNAL);
expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 2);
expect_value(__wrap_send, buf, buffer_tostring(simple_connector_data->last_buffer->buffer));
expect_string(__wrap_send, buf, "test buffer");
expect_value(__wrap_send, len, 11);
expect_value(__wrap_send, flags, MSG_NOSIGNAL); expect_value(__wrap_send, flags, MSG_NOSIGNAL);
expect_function_call(__wrap_send_internal_metrics); expect_function_call(__wrap_send_internal_metrics);
@ -669,13 +644,13 @@ static void test_simple_connector_worker(void **state)
simple_connector_worker(instance); simple_connector_worker(instance);
assert_int_equal(stats->buffered_metrics, 0); assert_int_equal(stats->buffered_metrics, 0);
assert_int_equal(stats->buffered_bytes, 84); assert_int_equal(stats->buffered_bytes, 0);
assert_int_equal(stats->received_bytes, 0); assert_int_equal(stats->received_bytes, 0);
assert_int_equal(stats->sent_bytes, 84); assert_int_equal(stats->sent_bytes, 0);
assert_int_equal(stats->sent_metrics, 1); assert_int_equal(stats->sent_metrics, 0);
assert_int_equal(stats->lost_metrics, 0); assert_int_equal(stats->lost_metrics, 0);
assert_int_equal(stats->receptions, 0); assert_int_equal(stats->receptions, 0);
assert_int_equal(stats->transmission_successes, 1); assert_int_equal(stats->transmission_successes, 0);
assert_int_equal(stats->transmission_failures, 0); assert_int_equal(stats->transmission_failures, 0);
assert_int_equal(stats->data_lost_events, 0); assert_int_equal(stats->data_lost_events, 0);
assert_int_equal(stats->lost_bytes, 0); assert_int_equal(stats->lost_bytes, 0);
@ -1159,7 +1134,7 @@ static void test_init_prometheus_remote_write_instance(void **state)
assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_chart_formatting, NULL);
assert_ptr_equal(instance->end_host_formatting, NULL); assert_ptr_equal(instance->end_host_formatting, NULL);
assert_ptr_equal(instance->end_batch_formatting, format_batch_prometheus_remote_write); assert_ptr_equal(instance->end_batch_formatting, format_batch_prometheus_remote_write);
assert_ptr_equal(instance->send_header, prometheus_remote_write_send_header); assert_ptr_equal(instance->prepare_header, prometheus_remote_write_prepare_header);
assert_ptr_equal(instance->check_response, process_prometheus_remote_write_response); assert_ptr_equal(instance->check_response, process_prometheus_remote_write_response);
assert_ptr_not_equal(instance->buffer, NULL); assert_ptr_not_equal(instance->buffer, NULL);
@ -1169,40 +1144,43 @@ static void test_init_prometheus_remote_write_instance(void **state)
(struct prometheus_remote_write_specific_data *)instance->connector_specific_data; (struct prometheus_remote_write_specific_data *)instance->connector_specific_data;
assert_ptr_not_equal(instance->connector_specific_data, NULL); assert_ptr_not_equal(instance->connector_specific_data, NULL);
assert_ptr_equal(connector_specific_data->write_request, 0xff); assert_ptr_not_equal(connector_specific_data->write_request, NULL);
freez(instance->connector_specific_data); freez(instance->connector_specific_data);
} }
static void test_prometheus_remote_write_send_header(void **state) static void test_prometheus_remote_write_prepare_header(void **state)
{ {
struct engine *engine = *state; struct engine *engine = *state;
struct instance *instance = engine->instance_root; struct instance *instance = engine->instance_root;
int sock = 1;
struct prometheus_remote_write_specific_config *connector_specific_config = struct prometheus_remote_write_specific_config *connector_specific_config =
callocz(1, sizeof(struct prometheus_remote_write_specific_config)); callocz(1, sizeof(struct prometheus_remote_write_specific_config));
instance->config.connector_specific_config = connector_specific_config; instance->config.connector_specific_config = connector_specific_config;
connector_specific_config->remote_write_path = strdupz("/receive"); connector_specific_config->remote_write_path = strdupz("/receive");
buffer_sprintf(instance->buffer, "test buffer"); struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
instance->connector_specific_data = simple_connector_data;
simple_connector_data->last_buffer = callocz(1, sizeof(struct simple_connector_buffer));
simple_connector_data->last_buffer->header = buffer_create(0);
simple_connector_data->last_buffer->buffer = buffer_create(0);
expect_function_call(__wrap_send); buffer_sprintf(simple_connector_data->last_buffer->buffer, "test buffer");
expect_value(__wrap_send, sockfd, 1);
expect_not_value(__wrap_send, buf, NULL); prometheus_remote_write_prepare_header(instance);
expect_string(
__wrap_send, buf, assert_string_equal(
buffer_tostring(simple_connector_data->last_buffer->header),
"POST /receive HTTP/1.1\r\n" "POST /receive HTTP/1.1\r\n"
"Host: localhost\r\n" "Host: localhost\r\n"
"Accept: */*\r\n" "Accept: */*\r\n"
"X-Prometheus-Remote-Write-Version: 0.1.0\r\n" "X-Prometheus-Remote-Write-Version: 0.1.0\r\n"
"Content-Length: 11\r\n" "Content-Length: 11\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n\r\n"); "Content-Type: application/x-www-form-urlencoded\r\n\r\n");
expect_value(__wrap_send, len, 167);
expect_value(__wrap_send, flags, MSG_NOSIGNAL);
assert_int_equal(prometheus_remote_write_send_header(&sock, instance),0);
free(connector_specific_config->remote_write_path); free(connector_specific_config->remote_write_path);
buffer_free(simple_connector_data->last_buffer->header);
buffer_free(simple_connector_data->last_buffer->buffer);
} }
static void test_process_prometheus_remote_write_response(void **state) static void test_process_prometheus_remote_write_response(void **state)
@ -1224,9 +1202,11 @@ static void test_format_host_prometheus_remote_write(void **state)
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *));
instance->connector_specific_data = simple_connector_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
mallocz(sizeof(struct prometheus_remote_write_specific_data *)); mallocz(sizeof(struct prometheus_remote_write_specific_data *));
instance->connector_specific_data = (void *)connector_specific_data; simple_connector_data->connector_specific_data = (void *)connector_specific_data;
connector_specific_data->write_request = (void *)0xff; connector_specific_data->write_request = (void *)0xff;
localhost->program_name = strdupz("test_program"); localhost->program_name = strdupz("test_program");
@ -1254,6 +1234,7 @@ static void test_format_host_prometheus_remote_write(void **state)
assert_int_equal(format_host_prometheus_remote_write(instance, localhost), 0); assert_int_equal(format_host_prometheus_remote_write(instance, localhost), 0);
freez(connector_specific_data); freez(connector_specific_data);
freez(simple_connector_data);
free(localhost->program_name); free(localhost->program_name);
free(localhost->program_version); free(localhost->program_version);
} }
@ -1263,9 +1244,11 @@ static void test_format_dimension_prometheus_remote_write(void **state)
struct engine *engine = *state; struct engine *engine = *state;
struct instance *instance = engine->instance_root; struct instance *instance = engine->instance_root;
struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *));
instance->connector_specific_data = simple_connector_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
mallocz(sizeof(struct prometheus_remote_write_specific_data *)); mallocz(sizeof(struct prometheus_remote_write_specific_data *));
instance->connector_specific_data = (void *)connector_specific_data; simple_connector_data->connector_specific_data = (void *)connector_specific_data;
connector_specific_data->write_request = (void *)0xff; connector_specific_data->write_request = (void *)0xff;
RRDDIM *rd = localhost->rrdset_root->dimensions; RRDDIM *rd = localhost->rrdset_root->dimensions;
@ -1291,11 +1274,16 @@ static void test_format_batch_prometheus_remote_write(void **state)
struct engine *engine = *state; struct engine *engine = *state;
struct instance *instance = engine->instance_root; struct instance *instance = engine->instance_root;
struct simple_connector_data *simple_connector_data = mallocz(sizeof(struct simple_connector_data *));
instance->connector_specific_data = simple_connector_data;
struct prometheus_remote_write_specific_data *connector_specific_data = struct prometheus_remote_write_specific_data *connector_specific_data =
mallocz(sizeof(struct prometheus_remote_write_specific_data *)); mallocz(sizeof(struct prometheus_remote_write_specific_data *));
instance->connector_specific_data = (void *)connector_specific_data; simple_connector_data->connector_specific_data = (void *)connector_specific_data;
connector_specific_data->write_request = __real_init_write_request(); connector_specific_data->write_request = __real_init_write_request();
expect_function_call(__wrap_simple_connector_end_batch);
expect_value(__wrap_simple_connector_end_batch, instance, instance);
will_return(__wrap_simple_connector_end_batch, 0);
__real_add_host_info( __real_add_host_info(
connector_specific_data->write_request, connector_specific_data->write_request,
"test_name", "test_instance", "test_application", "test_version", 15051); "test_name", "test_instance", "test_application", "test_version", 15051);
@ -1410,6 +1398,9 @@ static void test_aws_kinesis_connector_worker(void **state)
expect_value(__wrap_rrdset_is_exportable, st, st); expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1); will_return(__wrap_rrdset_is_exportable, 1);
expect_function_call(__wrap_simple_connector_end_batch);
expect_value(__wrap_simple_connector_end_batch, instance, instance);
will_return(__wrap_simple_connector_end_batch, 0);
__real_prepare_buffers(engine); __real_prepare_buffers(engine);
struct aws_kinesis_specific_config *connector_specific_config = struct aws_kinesis_specific_config *connector_specific_config =
@ -1542,6 +1533,9 @@ static void test_pubsub_connector_worker(void **state)
expect_value(__wrap_rrdset_is_exportable, st, st); expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1); will_return(__wrap_rrdset_is_exportable, 1);
expect_function_call(__wrap_simple_connector_end_batch);
expect_value(__wrap_simple_connector_end_batch, instance, instance);
will_return(__wrap_simple_connector_end_batch, 0);
__real_prepare_buffers(engine); __real_prepare_buffers(engine);
struct pubsub_specific_config *connector_specific_config = struct pubsub_specific_config *connector_specific_config =
@ -1663,7 +1657,7 @@ static void test_init_mongodb_instance(void **state)
assert_ptr_equal(instance->end_chart_formatting, NULL); assert_ptr_equal(instance->end_chart_formatting, NULL);
assert_ptr_equal(instance->end_host_formatting, flush_host_labels); assert_ptr_equal(instance->end_host_formatting, flush_host_labels);
assert_ptr_equal(instance->end_batch_formatting, format_batch_mongodb); assert_ptr_equal(instance->end_batch_formatting, format_batch_mongodb);
assert_ptr_equal(instance->send_header, NULL); assert_ptr_equal(instance->prepare_header, NULL);
assert_ptr_equal(instance->check_response, NULL); assert_ptr_equal(instance->check_response, NULL);
assert_ptr_not_equal(instance->buffer, NULL); assert_ptr_not_equal(instance->buffer, NULL);
@ -1884,7 +1878,7 @@ int main(void)
cmocka_unit_test_setup_teardown( cmocka_unit_test_setup_teardown(
test_init_prometheus_remote_write_instance, setup_configured_engine, teardown_configured_engine), test_init_prometheus_remote_write_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown( cmocka_unit_test_setup_teardown(
test_prometheus_remote_write_send_header, setup_initialized_engine, teardown_initialized_engine), test_prometheus_remote_write_prepare_header, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test(test_process_prometheus_remote_write_response), cmocka_unit_test(test_process_prometheus_remote_write_response),
cmocka_unit_test_setup_teardown( cmocka_unit_test_setup_teardown(
test_format_host_prometheus_remote_write, setup_initialized_engine, teardown_initialized_engine), test_format_host_prometheus_remote_write, setup_initialized_engine, teardown_initialized_engine),

View file

@ -116,6 +116,8 @@ int __mock_end_chart_formatting(struct instance *instance, RRDSET *st);
int __mock_end_host_formatting(struct instance *instance, RRDHOST *host); int __mock_end_host_formatting(struct instance *instance, RRDHOST *host);
int __mock_end_batch_formatting(struct instance *instance); int __mock_end_batch_formatting(struct instance *instance);
int __wrap_simple_connector_end_batch(struct instance *instance);
#if ENABLE_PROMETHEUS_REMOTE_WRITE #if ENABLE_PROMETHEUS_REMOTE_WRITE
void *__real_init_write_request(); void *__real_init_write_request();
void *__wrap_init_write_request(); void *__wrap_init_write_request();

View file

@ -69,14 +69,20 @@ int is_valid_connector(char *type, int check_reserved)
if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
return rc; return rc;
} else if (!strcmp(type, "graphite:http") || !strcmp(type, "graphite:https")) {
return rc;
} else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
return rc;
} else if (!strcmp(type, "json:http") || !strcmp(type, "json:https")) {
return rc;
} else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { } else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
return rc; return rc;
} else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) { } else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
return rc; return rc;
} else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
return rc;
} else if (!strcmp(type, "prometheus_remote_write")) { } else if (!strcmp(type, "prometheus_remote_write")) {
return rc; return rc;
} else if (!strcmp(type, "prometheus_remote_write:http") || !strcmp(type, "prometheus_remote_write:https")) {
return rc;
} else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) { } else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
return rc; return rc;
} else if (!strcmp(type, "pubsub") || !strcmp(type, "pubsub:plaintext")) { } else if (!strcmp(type, "pubsub") || !strcmp(type, "pubsub:plaintext")) {

View file

@ -2,7 +2,7 @@
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
SSL_CTX *netdata_opentsdb_ctx=NULL; SSL_CTX *netdata_exporting_ctx=NULL;
SSL_CTX *netdata_client_ctx=NULL; SSL_CTX *netdata_client_ctx=NULL;
SSL_CTX *netdata_srv_ctx=NULL; SSL_CTX *netdata_srv_ctx=NULL;
const char *security_key=NULL; const char *security_key=NULL;
@ -201,7 +201,7 @@ static SSL_CTX * security_initialize_openssl_server() {
* @param selector informs the context that must be initialized, the following list has the valid values: * @param selector informs the context that must be initialized, the following list has the valid values:
* NETDATA_SSL_CONTEXT_SERVER - the server context * NETDATA_SSL_CONTEXT_SERVER - the server context
* NETDATA_SSL_CONTEXT_STREAMING - Starts the streaming context. * NETDATA_SSL_CONTEXT_STREAMING - Starts the streaming context.
* NETDATA_SSL_CONTEXT_OPENTSDB - Starts the OpenTSDB contextv * NETDATA_SSL_CONTEXT_EXPORTING - Starts the OpenTSDB contextv
*/ */
void security_start_ssl(int selector) { void security_start_ssl(int selector) {
switch (selector) { switch (selector) {
@ -222,8 +222,8 @@ void security_start_ssl(int selector) {
SSL_CTX_set_mode(netdata_client_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |SSL_MODE_AUTO_RETRY); SSL_CTX_set_mode(netdata_client_ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |SSL_MODE_AUTO_RETRY);
break; break;
} }
case NETDATA_SSL_CONTEXT_OPENTSDB: { case NETDATA_SSL_CONTEXT_EXPORTING: {
netdata_opentsdb_ctx = security_initialize_openssl_client(); netdata_exporting_ctx = security_initialize_openssl_client();
break; break;
} }
} }
@ -234,20 +234,18 @@ void security_start_ssl(int selector) {
* *
* Clean all the allocated contexts from netdata. * Clean all the allocated contexts from netdata.
*/ */
void security_clean_openssl() { void security_clean_openssl()
if (netdata_srv_ctx) {
{ if (netdata_srv_ctx) {
SSL_CTX_free(netdata_srv_ctx); SSL_CTX_free(netdata_srv_ctx);
} }
if (netdata_client_ctx) if (netdata_client_ctx) {
{
SSL_CTX_free(netdata_client_ctx); SSL_CTX_free(netdata_client_ctx);
} }
if ( netdata_opentsdb_ctx ) if (netdata_exporting_ctx) {
{ SSL_CTX_free(netdata_exporting_ctx);
SSL_CTX_free(netdata_opentsdb_ctx);
} }
#if OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 #if OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110

View file

@ -14,7 +14,7 @@
#define NETDATA_SSL_CONTEXT_SERVER 0 #define NETDATA_SSL_CONTEXT_SERVER 0
#define NETDATA_SSL_CONTEXT_STREAMING 1 #define NETDATA_SSL_CONTEXT_STREAMING 1
#define NETDATA_SSL_CONTEXT_OPENTSDB 2 #define NETDATA_SSL_CONTEXT_EXPORTING 2
# ifdef ENABLE_HTTPS # ifdef ENABLE_HTTPS
@ -34,7 +34,7 @@ struct netdata_ssl{
uint32_t flags; //The flags for SSL connection uint32_t flags; //The flags for SSL connection
}; };
extern SSL_CTX *netdata_opentsdb_ctx; extern SSL_CTX *netdata_exporting_ctx;
extern SSL_CTX *netdata_client_ctx; extern SSL_CTX *netdata_client_ctx;
extern SSL_CTX *netdata_srv_ctx; extern SSL_CTX *netdata_srv_ctx;
extern const char *security_key; extern const char *security_key;

View file

@ -4,24 +4,26 @@
struct prometheus_output_options { struct prometheus_output_options {
char *name; char *name;
BACKENDS_PROMETHEUS_OUTPUT_OPTIONS flag; PROMETHEUS_OUTPUT_OPTIONS flag;
} prometheus_output_flags_root[] = { } prometheus_output_flags_root[] = {
{ "help", BACKENDS_PROMETHEUS_OUTPUT_HELP }, { "help", PROMETHEUS_OUTPUT_HELP },
{ "types", BACKENDS_PROMETHEUS_OUTPUT_TYPES }, { "types", PROMETHEUS_OUTPUT_TYPES },
{ "names", BACKENDS_PROMETHEUS_OUTPUT_NAMES }, { "names", PROMETHEUS_OUTPUT_NAMES },
{ "timestamps", BACKENDS_PROMETHEUS_OUTPUT_TIMESTAMPS }, { "timestamps", PROMETHEUS_OUTPUT_TIMESTAMPS },
{ "variables", BACKENDS_PROMETHEUS_OUTPUT_VARIABLES }, { "variables", PROMETHEUS_OUTPUT_VARIABLES },
{ "oldunits", BACKENDS_PROMETHEUS_OUTPUT_OLDUNITS }, { "oldunits", PROMETHEUS_OUTPUT_OLDUNITS },
{ "hideunits", BACKENDS_PROMETHEUS_OUTPUT_HIDEUNITS }, { "hideunits", PROMETHEUS_OUTPUT_HIDEUNITS },
// terminator // terminator
{ NULL, BACKENDS_PROMETHEUS_OUTPUT_NONE }, { NULL, PROMETHEUS_OUTPUT_NONE },
}; };
inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client *w, char *url) { inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client *w, char *url) {
int format = ALLMETRICS_SHELL; int format = ALLMETRICS_SHELL;
const char *prometheus_server = w->client_ip; const char *prometheus_server = w->client_ip;
uint32_t prometheus_backend_options = global_backend_options; uint32_t prometheus_backend_options = global_backend_options;
BACKENDS_PROMETHEUS_OUTPUT_OPTIONS prometheus_output_options = BACKENDS_PROMETHEUS_OUTPUT_TIMESTAMPS | ((global_backend_options & BACKEND_OPTION_SEND_NAMES)?BACKENDS_PROMETHEUS_OUTPUT_NAMES:0); PROMETHEUS_OUTPUT_OPTIONS prometheus_output_options =
PROMETHEUS_OUTPUT_TIMESTAMPS |
((global_backend_options & BACKEND_OPTION_SEND_NAMES) ? PROMETHEUS_OUTPUT_NAMES : 0);
const char *prometheus_prefix = global_backend_prefix; const char *prometheus_prefix = global_backend_prefix;
while(url) { while(url) {
@ -84,7 +86,7 @@ inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client
case ALLMETRICS_PROMETHEUS: case ALLMETRICS_PROMETHEUS:
w->response.data->contenttype = CT_PROMETHEUS; w->response.data->contenttype = CT_PROMETHEUS;
backends_rrd_stats_api_v1_charts_allmetrics_prometheus_single_host( rrd_stats_api_v1_charts_allmetrics_prometheus_single_host(
host host
, w->response.data , w->response.data
, prometheus_server , prometheus_server
@ -96,7 +98,7 @@ inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client
case ALLMETRICS_PROMETHEUS_ALL_HOSTS: case ALLMETRICS_PROMETHEUS_ALL_HOSTS:
w->response.data->contenttype = CT_PROMETHEUS; w->response.data->contenttype = CT_PROMETHEUS;
backends_rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts( rrd_stats_api_v1_charts_allmetrics_prometheus_all_hosts(
host host
, w->response.data , w->response.data
, prometheus_server , prometheus_server