diff --git a/CMakeLists.txt b/CMakeLists.txt index d97ae1d11f..56f3a2416e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1152,7 +1152,6 @@ endif() -Wl,--wrap=rrdset_is_exportable -Wl,--wrap=exporting_calculate_value_from_stored_data -Wl,--wrap=prepare_buffers - -Wl,--wrap=notify_workers -Wl,--wrap=send_internal_metrics -Wl,--wrap=now_realtime_sec -Wl,--wrap=uv_thread_set_name_np diff --git a/Makefile.am b/Makefile.am index c26b93ae86..8a6de4e0e8 100644 --- a/Makefile.am +++ b/Makefile.am @@ -878,7 +878,6 @@ if ENABLE_UNITTESTS -Wl,--wrap=rrdset_is_exportable \ -Wl,--wrap=exporting_calculate_value_from_stored_data \ -Wl,--wrap=prepare_buffers \ - -Wl,--wrap=notify_workers \ -Wl,--wrap=send_internal_metrics \ -Wl,--wrap=now_realtime_sec \ -Wl,--wrap=uv_thread_set_name_np \ diff --git a/exporting/exporting_engine.c b/exporting/exporting_engine.c index 93347328cd..06dc8156fe 100644 --- a/exporting/exporting_engine.c +++ b/exporting/exporting_engine.c @@ -48,17 +48,8 @@ void *exporting_main(void *ptr) heartbeat_next(&hb, step_ut); engine->now = now_realtime_sec(); - if (mark_scheduled_instances(engine)) { - if (prepare_buffers(engine) != 0) { - error("EXPORTING: cannot prepare data to send"); - break; - } - } - - if (notify_workers(engine) != 0) { - error("EXPORTING: cannot communicate with exporting connector instance working threads"); - break; - } + if (mark_scheduled_instances(engine)) + prepare_buffers(engine); send_main_rusage(st_main_rusage, rd_main_user, rd_main_system); diff --git a/exporting/exporting_engine.h b/exporting/exporting_engine.h index 9673e823d6..7dc79e3956 100644 --- a/exporting/exporting_engine.h +++ b/exporting/exporting_engine.h @@ -151,6 +151,7 @@ struct instance { struct stats stats; int scheduled; + int disabled; int skip_host; int skip_chart; @@ -203,8 +204,7 @@ EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type); int init_connectors(struct engine *engine); int mark_scheduled_instances(struct engine *engine); -int prepare_buffers(struct engine *engine); -int notify_workers(struct engine *engine); +void prepare_buffers(struct engine *engine); size_t exporting_name_copy(char *dst, const char *src, size_t max_len); @@ -216,13 +216,13 @@ calculated_number exporting_calculate_value_from_stored_data( RRDDIM *rd, time_t *last_timestamp); -int start_batch_formatting(struct engine *engine); -int start_host_formatting(struct engine *engine, RRDHOST *host); -int start_chart_formatting(struct engine *engine, RRDSET *st); -int metric_formatting(struct engine *engine, RRDDIM *rd); -int end_chart_formatting(struct engine *engine, RRDSET *st); -int end_host_formatting(struct engine *engine, RRDHOST *host); -int end_batch_formatting(struct engine *engine); +void start_batch_formatting(struct engine *engine); +void start_host_formatting(struct engine *engine, RRDHOST *host); +void start_chart_formatting(struct engine *engine, RRDSET *st); +void metric_formatting(struct engine *engine, RRDDIM *rd); +void end_chart_formatting(struct engine *engine, RRDSET *st); +void end_host_formatting(struct engine *engine, RRDHOST *host); +void end_batch_formatting(struct engine *engine); int flush_host_labels(struct instance *instance, RRDHOST *host); int simple_connector_update_buffered_bytes(struct instance *instance); @@ -235,6 +235,14 @@ void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_ void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system); void send_internal_metrics(struct instance *instance); +static inline void disable_instance(struct instance *instance) +{ + instance->disabled = 1; + instance->scheduled = 0; + uv_mutex_unlock(&instance->mutex); + error("EXPORTING: Instance %s disabled", instance->config.name); +} + #include "exporting/prometheus/prometheus.h" #endif /* NETDATA_EXPORTING_ENGINE_H */ diff --git a/exporting/process_data.c b/exporting/process_data.c index f2442e701c..b3d0fdef2d 100644 --- a/exporting/process_data.c +++ b/exporting/process_data.c @@ -43,7 +43,7 @@ int mark_scheduled_instances(struct engine *engine) int instances_were_scheduled = 0; for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { - if (engine->now % instance->config.update_every < localhost->rrd_update_every) { + if (!instance->disabled && (engine->now % instance->config.update_every < localhost->rrd_update_every)) { instance->scheduled = 1; instances_were_scheduled = 1; instance->before = engine->now; @@ -160,21 +160,18 @@ calculated_number exporting_calculate_value_from_stored_data( * Start batch formatting for every connector instance's buffer * * @param engine an engine data structure. - * @return Returns 0 on success, 1 on failure. */ -int start_batch_formatting(struct engine *engine) +void start_batch_formatting(struct engine *engine) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled) { uv_mutex_lock(&instance->mutex); if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) { error("EXPORTING: cannot start batch formatting for %s", instance->config.name); - return 1; + disable_instance(instance); } } } - - return 0; } /** @@ -182,24 +179,21 @@ int start_batch_formatting(struct engine *engine) * * @param engine an engine data structure. * @param host a data collecting host. - * @return Returns 0 on success, 1 on failure. */ -int start_host_formatting(struct engine *engine, RRDHOST *host) +void start_host_formatting(struct engine *engine, RRDHOST *host) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled) { if (rrdhost_is_exportable(instance, host)) { if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) { error("EXPORTING: cannot start host formatting for %s", instance->config.name); - return 1; + disable_instance(instance); } } else { instance->skip_host = 1; } } } - - return 0; } /** @@ -207,24 +201,21 @@ int start_host_formatting(struct engine *engine, RRDHOST *host) * * @param engine an engine data structure. * @param st a chart. - * @return Returns 0 on success, 1 on failure. */ -int start_chart_formatting(struct engine *engine, RRDSET *st) +void start_chart_formatting(struct engine *engine, RRDSET *st) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled && !instance->skip_host) { if (rrdset_is_exportable(instance, st)) { if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) { error("EXPORTING: cannot start chart formatting for %s", instance->config.name); - return 1; + disable_instance(instance); } } else { instance->skip_chart = 1; } } } - - return 0; } /** @@ -232,21 +223,19 @@ int start_chart_formatting(struct engine *engine, RRDSET *st) * * @param engine an engine data structure. * @param rd a dimension(metric) in the Netdata database. - * @return Returns 0 on success, 1 on failure. */ -int metric_formatting(struct engine *engine, RRDDIM *rd) +void metric_formatting(struct engine *engine, RRDDIM *rd) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) { error("EXPORTING: cannot format metric for %s", instance->config.name); - return 1; + disable_instance(instance); + continue; } instance->stats.buffered_metrics++; } } - - return 0; } /** @@ -254,21 +243,19 @@ int metric_formatting(struct engine *engine, RRDDIM *rd) * * @param engine an engine data structure. * @param a chart. - * @return Returns 0 on success, 1 on failure. */ -int end_chart_formatting(struct engine *engine, RRDSET *st) +void end_chart_formatting(struct engine *engine, RRDSET *st) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled && !instance->skip_host && !instance->skip_chart) { if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) { error("EXPORTING: cannot end chart formatting for %s", instance->config.name); - return 1; + disable_instance(instance); + continue; } } instance->skip_chart = 0; } - - return 0; } /** @@ -276,36 +263,34 @@ int end_chart_formatting(struct engine *engine, RRDSET *st) * * @param engine an engine data structure. * @param host a data collecting host. - * @return Returns 0 on success, 1 on failure. */ -int end_host_formatting(struct engine *engine, RRDHOST *host) +void end_host_formatting(struct engine *engine, RRDHOST *host) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled && !instance->skip_host) { if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) { error("EXPORTING: cannot end host formatting for %s", instance->config.name); - return 1; + disable_instance(instance); + continue; } } instance->skip_host = 0; } - - return 0; } /** * End batch formatting for every connector instance's buffer * * @param engine an engine data structure. - * @return Returns 0 on success, 1 on failure. */ -int end_batch_formatting(struct engine *engine) +void end_batch_formatting(struct engine *engine) { for (struct instance *instance = engine->instance_root; instance; instance = instance->next) { if (instance->scheduled) { if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) { error("EXPORTING: cannot end batch formatting for %s", instance->config.name); - return 1; + disable_instance(instance); + continue; } uv_mutex_unlock(&instance->mutex); uv_cond_signal(&instance->cond_var); @@ -314,8 +299,6 @@ int end_batch_formatting(struct engine *engine) instance->after = instance->before; } } - - return 0; } /** @@ -325,51 +308,39 @@ int end_batch_formatting(struct engine *engine) * configured rules. * * @param engine an engine data structure. - * @return Returns 0 on success, 1 on failure. */ -int prepare_buffers(struct engine *engine) +void prepare_buffers(struct engine *engine) { - if (start_batch_formatting(engine) != 0) - return 1; - netdata_thread_disable_cancelability(); + start_batch_formatting(engine); + rrd_rdlock(); RRDHOST *host; rrdhost_foreach_read(host) { rrdhost_rdlock(host); - if (start_host_formatting(engine, host) != 0) - return 1; + start_host_formatting(engine, host); RRDSET *st; rrdset_foreach_read(st, host) { rrdset_rdlock(st); - if (start_chart_formatting(engine, st) != 0) - return 1; + start_chart_formatting(engine, st); RRDDIM *rd; rrddim_foreach_read(rd, st) - { - if (metric_formatting(engine, rd) != 0) - return 1; - } + metric_formatting(engine, rd); - if (end_chart_formatting(engine, st) != 0) - return 1; + end_chart_formatting(engine, st); rrdset_unlock(st); } - if (end_host_formatting(engine, host) != 0) - return 1; + end_host_formatting(engine, host); rrdhost_unlock(host); } rrd_unlock(); netdata_thread_enable_cancelability(); - if (end_batch_formatting(engine) != 0) - return 1; - - return 0; + end_batch_formatting(engine); } /** @@ -401,18 +372,3 @@ int simple_connector_update_buffered_bytes(struct instance *instance) return 0; } - -/** - * Notify workers - * - * Notify exporting connector instance working threads that data is ready to send. - * - * @param engine an engine data structure. - * @return Returns 0 on success, 1 on failure. - */ -int notify_workers(struct engine *engine) -{ - (void)engine; - - return 0; -} diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c index 8fafada929..2d966e51ac 100644 --- a/exporting/tests/exporting_doubles.c +++ b/exporting/tests/exporting_doubles.c @@ -75,13 +75,6 @@ int __wrap_prepare_buffers(struct engine *engine) return mock_type(int); } -int __wrap_notify_workers(struct engine *engine) -{ - function_called(); - check_expected_ptr(engine); - return mock_type(int); -} - void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system) { function_called(); diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c index 3835369356..6cd29132fc 100644 --- a/exporting/tests/test_exporting_engine.c +++ b/exporting/tests/test_exporting_engine.c @@ -60,10 +60,6 @@ static void test_exporting_engine(void **state) expect_memory(__wrap_prepare_buffers, engine, engine, sizeof(struct engine)); will_return(__wrap_prepare_buffers, 0); - expect_function_call(__wrap_notify_workers); - expect_memory(__wrap_notify_workers, engine, engine, sizeof(struct engine)); - will_return(__wrap_notify_workers, 0); - expect_function_call(__wrap_send_main_rusage); expect_value(__wrap_send_main_rusage, st_rusage, NULL); expect_value(__wrap_send_main_rusage, rd_user, NULL); diff --git a/exporting/tests/test_exporting_engine.h b/exporting/tests/test_exporting_engine.h index 7df3c39c12..237bc5f9b9 100644 --- a/exporting/tests/test_exporting_engine.h +++ b/exporting/tests/test_exporting_engine.h @@ -93,8 +93,6 @@ calculated_number __wrap_exporting_calculate_value_from_stored_data( int __real_prepare_buffers(struct engine *engine); int __wrap_prepare_buffers(struct engine *engine); -int __wrap_notify_workers(struct engine *engine); - void __real_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);