0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-17 03:02:41 +00:00

Removes Legacy JSON Cloud Protocol Support In Agent ()

* removes old protocol support (cloud removed support already)
This commit is contained in:
Timotej S 2022-06-27 16:03:20 +02:00 committed by GitHub
parent b8bfe953fb
commit cb13f0787d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 166 additions and 1380 deletions

View file

@ -787,11 +787,6 @@ set(ACLK_ALWAYS_BUILD
aclk/aclk_proxy.h
)
set(ACLK_COMMON_FILES
aclk/aclk_collector_list.c
aclk/aclk_collector_list.h
)
set(ACLK_FILES
aclk/aclk.c
aclk/aclk.h
@ -1172,7 +1167,6 @@ list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER})
list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD})
list(APPEND NETDATA_FILES ${TIMEX_PLUGIN_FILES})
list(APPEND NETDATA_FILES ${ACLK_FILES} ${ACLK_PROTO_BUILT_SRCS} ${ACLK_PROTO_BUILT_HDRS})
list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES})
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include)

View file

@ -641,11 +641,7 @@ ACLK_FILES = \
mqtt_websockets/c-rbuf/include/ringbuffer.h \
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \
mqtt_websockets/MQTT-C/src/mqtt.c \
mqtt_websockets/MQTT-C/include/mqtt.h
$(NULL)
if ENABLE_NEW_CLOUD_PROTOCOL
ACLK_FILES += \
mqtt_websockets/MQTT-C/include/mqtt.h \
aclk/aclk_charts_api.c \
aclk/aclk_charts_api.h \
aclk/aclk_alarm_api.c \
@ -768,17 +764,8 @@ aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.cc \
aclk/aclk-schemas/proto/nodeinstance/info/v1/info.pb.h: aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto
$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
endif #ENABLE_NEW_CLOUD_PROTOCOL
endif #ENABLE_ACLK
if ENABLE_ACLK
ACLK_COMMON_FILES = \
aclk/aclk_collector_list.c \
aclk/aclk_collector_list.h \
$(NULL)
endif
ACLK_ALWAYS_BUILD_FILES = \
aclk/aclk_rrdhost_state.h \
aclk/aclk_api.c \
@ -885,7 +872,6 @@ NETDATA_FILES = \
$(CLAIM_FILES) \
$(PARSER_FILES) \
$(ACLK_ALWAYS_BUILD_FILES) \
$(ACLK_COMMON_FILES) \
$(ACLK_FILES) \
$(SPAWN_PLUGIN_FILES) \
$(TIMEX_PLUGIN_FILES) \

View file

@ -10,7 +10,6 @@
#include "aclk_query_queue.h"
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
#include "https_client.h"
#include "schema-wrappers/schema_wrappers.h"
@ -46,8 +45,6 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
.agent_state = ACLK_HOST_INITIALIZING,
.last_popcorn_interrupt = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
@ -188,54 +185,10 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
if (!cmd_topic) {
error("Error retrieving command topic");
return;
}
if (msglen > RX_MSGLEN_MAX - 1)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
memcpy(cmsg,
msg,
len);
cmsg[len] = 0;
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 512
char filename[FN_MAX_LEN];
int logfd;
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
if(logfd < 0)
error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
write(logfd, msg, msglen);
close(logfd);
#endif
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
if (strcmp(cmd_topic, topic))
error("Received message on unexpected topic %s", topic);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
error("Link is shutting down. Ignoring incoming message.");
return;
}
aclk_handle_cloud_cmd_message(cmsg);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
aclk_rcvd_cloud_msgs++;
if (msglen > RX_MSGLEN_MAX)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@ -272,15 +225,6 @@ static void msg_callback_new_protocol(const char *topic, const void *msg, size_t
aclk_handle_new_cloud_msg(msgtype, msg, msglen);
}
static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
aclk_rcvd_cloud_msgs++;
if (aclk_use_new_cloud_arch)
msg_callback_new_protocol(topic, msg, msglen, qos);
else
msg_callback_old_protocol(topic, msg, msglen, qos);
}
#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
@ -356,40 +300,6 @@ static int handle_connection(mqtt_wss_client client)
return 0;
}
inline static int aclk_popcorn_check()
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 1;
}
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
inline static int aclk_popcorn_check_bump()
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
ACLK_SHARED_STATE_UNLOCK;
return 1;
}
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
static inline void queue_connect_payloads(void)
{
aclk_query_t query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost;
query->data.metadata_info.initial_on_connect = 1;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 1;
aclk_queue_query(query);
}
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
@ -399,15 +309,11 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
else
mqtt_wss_subscribe(client, topic, 1);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
mqtt_wss_subscribe(client, topic, 1);
}
#endif
topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
if (!topic)
error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
else
mqtt_wss_subscribe(client, topic, 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
@ -415,55 +321,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch) {
#endif
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
error("Sending `connect` payload immediately as popcorning was finished already.");
queue_connect_payloads();
}
ACLK_SHARED_STATE_UNLOCK;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
} else {
aclk_send_agent_connection_update(client, 1);
}
#endif
}
/* Waits until agent is ready or needs to exit
* @param client instance of mqtt_wss_client
* @param query_threads pointer to aclk_query_threads
* structure where to store data about started query threads
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
static int wait_popcorning_finishes()
{
time_t elapsed;
int need_wait;
if (aclk_use_new_cloud_arch)
return 0;
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
if (elapsed >= ACLK_STABLE_TIMEOUT) {
aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popcorn timer finished");
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
need_wait = ACLK_STABLE_TIMEOUT - elapsed;
error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
sleep(need_wait);
}
return 1;
aclk_send_agent_connection_update(client, 1);
}
void aclk_graceful_disconnect(mqtt_wss_client client)
@ -471,12 +329,8 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
info("Preparing to gracefully shutdown ACLK connection");
aclk_queue_lock();
aclk_queue_flush();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch)
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
else
#endif
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
@ -594,8 +448,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
url_t mqtt_url;
#endif
json_object *lwt = NULL;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
@ -629,8 +481,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
.drop_on_publish_fail = 1
};
aclk_use_new_cloud_arch = 0;
#ifndef ACLK_DISABLE_CHALLENGE
if (aclk_env) {
aclk_env_t_destroy(aclk_env);
@ -649,20 +499,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (netdata_exit)
return 1;
if (aclk_env->encoding == ACLK_ENC_PROTO) {
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
if (aclk_env->encoding != ACLK_ENC_PROTO) {
error_report("This agent can only use the new cloud protocol but cloud requested old one.");
continue;
#else
if (!aclk_env_has_capa("proto")) {
error ("Can't encoding=proto without at least \"proto\" capability.");
continue;
}
info("Switching ACLK to new protobuf protocol. Due to /env response.");
aclk_use_new_cloud_arch = 1;
#endif
}
if (!aclk_env_has_capa("proto")) {
error ("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
info("New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
@ -679,10 +526,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
// aclk_get_topic moved here as during OTP we
// generate the topic cache
if (aclk_use_new_cloud_arch)
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
else
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
if (!mqtt_conn_params.will_topic) {
error("Couldn't get LWT topic. Will not send LWT.");
@ -708,17 +552,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch) {
mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
} else {
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
}
#endif
mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
#ifdef ACLK_DISABLE_CHALLENGE
ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@ -732,10 +566,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
freez((char*)mqtt_conn_params.username);
#endif
if (aclk_use_new_cloud_arch)
freez((char *)mqtt_conn_params.will_msg);
else
json_object_put(lwt);
freez((char *)mqtt_conn_params.will_msg);
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
@ -778,10 +609,7 @@ void *aclk_main(void *ptr)
return NULL;
}
unsigned int proto_hdl_cnt;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
proto_hdl_cnt = aclk_init_rx_msg_handlers();
#endif
unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
@ -792,7 +620,6 @@ void *aclk_main(void *ptr)
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
aclk_popcorn_check_bump(); // start localhost popcorn timer
query_threads.count = read_query_thread_count();
if (wait_till_cloud_enabled())
@ -803,11 +630,7 @@ void *aclk_main(void *ptr)
use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
#else
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
#endif
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
@ -835,28 +658,9 @@ void *aclk_main(void *ptr)
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;
#if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL)
error_report("############################ WARNING ###############################");
error_report("# Your agent is configured to connect to cloud but has #");
error_report("# no protobuf protocol support (uses legacy JSON protocol) #");
error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #");
error_report("# Visit following link for more info and instructions how to solve #");
error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #");
error_report("######################################################################");
#endif
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
if (wait_popcorning_finishes())
goto exit_full;
if (unlikely(!query_threads.thread_list))
aclk_query_threads_start(&query_threads, mqttwss_client);
if (!aclk_use_new_cloud_arch)
queue_connect_payloads();
if (handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
@ -890,168 +694,12 @@ exit:
return NULL;
}
// TODO this is taken over as workaround from old ACLK
// fix this in both old and new ACLK
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_alarm_reload(void)
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return;
}
ACLK_SHARED_STATE_UNLOCK;
aclk_queue_query(aclk_query_new(METADATA_ALARMS));
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer;
json_object *msg;
if (host != localhost)
return 0;
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
msg = json_tokener_parse(local_buffer->buffer);
struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
query->data.alarm_update = msg;
aclk_queue_query(query);
buffer_free(local_buffer);
return 0;
}
int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
struct aclk_query *query;
if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
return 0;
query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
if(create) {
query->data.chart_add_del.host = host;
query->data.chart_add_del.chart_name = strdupz(chart_name);
} else {
query->data.metadata_info.host = host;
query->data.metadata_info.initial_on_connect = 0;
}
aclk_queue_query(query);
return 0;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
COLLECTOR_LOCK;
tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
if (unlikely(tmp_collector->count != 1)) {
COLLECTOR_UNLOCK;
return;
}
COLLECTOR_UNLOCK;
if (aclk_popcorn_check_bump())
return;
if (host != localhost)
return;
query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost; //TODO
query->data.metadata_info.initial_on_connect = 0;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
/*
* Delete a collector from the list
* If the chart count reaches zero the collector will be removed
* from the list by calling del_collector.
*
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
return;
}
COLLECTOR_LOCK;
tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
if (unlikely(!tmp_collector || tmp_collector->count)) {
COLLECTOR_UNLOCK;
return;
}
debug(
D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
tmp_collector->count);
COLLECTOR_UNLOCK;
_free_collector(tmp_collector);
if (aclk_popcorn_check_bump())
return;
if (host != localhost)
return;
query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost; //TODO
query->data.metadata_info.initial_on_connect = 0;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd)
{
uuid_t node_id;
int ret;
if (!aclk_connected || !aclk_use_new_cloud_arch)
if (!aclk_connected)
return;
ret = get_node_id(&host->host_uuid, &node_id);
@ -1158,14 +806,12 @@ void aclk_send_node_instances()
}
freez(list_head);
}
#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
{
struct proto_alert_status status;
@ -1221,7 +867,6 @@ static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
);
freez(stats);
}
#endif
char *ng_aclk_state(void)
{
@ -1232,13 +877,9 @@ char *ng_aclk_state(void)
buffer_strcat(wb,
"ACLK Available: Yes\n"
"ACLK Version: 2\n"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
"Protocols Supported: Legacy, Protobuf\n"
#else
"Protocols Supported: Legacy\n"
#endif
"Protocols Supported: Protobuf\n"
);
buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
@ -1274,7 +915,6 @@ char *ng_aclk_state(void)
if (aclk_connected) {
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
RRDHOST *host;
rrd_rdlock();
rrdhost_foreach_read(host) {
@ -1309,7 +949,6 @@ char *ng_aclk_state(void)
fill_chart_status_for_host(wb, host);
}
rrd_unlock();
#endif
}
ret = strdupz(buffer_tostring(wb));
@ -1317,7 +956,6 @@ char *ng_aclk_state(void)
return ret;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
{
struct proto_alert_status status;
@ -1382,7 +1020,6 @@ static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
freez(stats);
}
#endif
static json_object *timestamp_to_json(const time_t *t)
{
@ -1406,15 +1043,8 @@ char *ng_aclk_state_json(void)
json_object_object_add(msg, "aclk-version", tmp);
grp = json_object_new_array();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
tmp = json_object_new_string("Legacy");
json_object_array_add(grp, tmp);
tmp = json_object_new_string("Protobuf");
json_object_array_add(grp, tmp);
#else
tmp = json_object_new_string("Legacy");
json_object_array_add(grp, tmp);
#endif
json_object_object_add(msg, "protocols-supported", grp);
char *agent_id = is_agent_claimed();
@ -1435,7 +1065,7 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_connected);
json_object_object_add(msg, "online", tmp);
tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
tmp = json_object_new_string("Protobuf");
json_object_object_add(msg, "used-cloud-protocol", tmp);
tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
@ -1462,7 +1092,6 @@ char *ng_aclk_state_json(void)
tmp = json_object_new_boolean(aclk_disable_runtime);
json_object_object_add(msg, "banned-by-cloud", tmp);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
grp = json_object_new_array();
RRDHOST *host;
@ -1514,7 +1143,6 @@ char *ng_aclk_state_json(void)
}
rrd_unlock();
json_object_object_add(msg, "node-instances", grp);
#endif
char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
json_object_put(msg);

View file

@ -21,9 +21,6 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
extern struct aclk_shared_state {
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
// To wait for `disconnect` message PUBACK
// when shutting down
// at the same time if > 0 we know link is
@ -32,21 +29,8 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
void aclk_alarm_reload(void);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
/* Informs ACLK about created/deleted chart
* @param create 0 - if chart was deleted, other if chart created
*/
int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int cmd);
void aclk_send_node_instances(void);
#endif
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);

View file

@ -13,7 +13,6 @@ usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
int use_mqtt_5 = 0;
@ -33,16 +32,6 @@ void *aclk_starter(void *ptr) {
}
return aclk_main(ptr);
}
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
#endif /* ENABLE_ACLK */
void add_aclk_host_labels(void) {
@ -71,16 +60,13 @@ void add_aclk_host_labels(void) {
break;
}
int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_impl", "Next Generation", RRDLABEL_SRC_AUTO);
rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#else
rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
#endif
#endif
}

View file

@ -15,31 +15,16 @@ extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern int aclk_disable_runtime;
extern int aclk_disable_single_updates;
extern int aclk_stats_enabled;
extern int aclk_alert_reloaded;
extern int aclk_ng;
extern int use_mqtt_5;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
void aclk_alarm_reload(void);
int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
void aclk_host_state_update(RRDHOST *host, int connect);
#endif
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \

View file

@ -1,193 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// This is copied from Legacy ACLK, Original Author: amoss
// TODO unmess this
#include "aclk_collector_list.h"
netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
struct _collector *collector_list = NULL;
/*
* Free a collector structure
*/
void _free_collector(struct _collector *collector)
{
if (likely(collector->plugin_name))
freez(collector->plugin_name);
if (likely(collector->module_name))
freez(collector->module_name);
if (likely(collector->hostname))
freez(collector->hostname);
freez(collector);
}
/*
* This will report the collector list
*
*/
#ifdef ACLK_DEBUG
static void _dump_collector_list()
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
info("DUMPING ALL COLLECTORS");
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
info("DUMPING ALL COLLECTORS -- nothing found");
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
while (tmp_collector) {
info(
"COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
tmp_collector = tmp_collector->next;
}
info("DUMPING ALL COLLECTORS DONE");
COLLECTOR_UNLOCK;
}
#endif
/*
* This will cleanup the collector list
*
*/
void _reset_collector_list()
{
struct _collector *tmp_collector, *next_collector;
COLLECTOR_LOCK;
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
collector_list->count = 0;
collector_list->next = NULL;
// We broke the link; we can unlock
COLLECTOR_UNLOCK;
while (tmp_collector) {
next_collector = tmp_collector->next;
_free_collector(tmp_collector);
tmp_collector = next_collector;
}
}
/*
* Find a collector (if it exists)
* Must lock before calling this
* If last_collector is not null, it will return the previous collector in the linked
* list (used in collector delete)
*/
static struct _collector *_find_collector(
const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
struct _collector *tmp_collector, *prev_collector;
uint32_t plugin_hash;
uint32_t module_hash;
uint32_t hostname_hash;
if (unlikely(!collector_list)) {
collector_list = callocz(1, sizeof(struct _collector));
return NULL;
}
if (unlikely(!collector_list->next))
return NULL;
plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
module_hash = module_name ? simple_hash(module_name) : 1;
hostname_hash = simple_hash(hostname);
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
prev_collector = collector_list;
while (tmp_collector) {
if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
(!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
(!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
if (unlikely(last_collector))
*last_collector = prev_collector;
return tmp_collector;
}
prev_collector = tmp_collector;
tmp_collector = tmp_collector->next;
}
return tmp_collector;
}
/*
* Called to delete a collector
* It will reduce the count (chart_count) and will remove it
* from the linked list if the count reaches zero
* The structure will be returned to the caller to free
* the resources
*
*/
struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector, *prev_collector = NULL;
tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
if (likely(tmp_collector)) {
--tmp_collector->count;
if (unlikely(!tmp_collector->count))
prev_collector->next = tmp_collector->next;
}
return tmp_collector;
}
/*
* Add a new collector (plugin / module) to the list
* If it already exists just update the chart count
*
* Lock before calling
*/
struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
if (unlikely(!tmp_collector)) {
tmp_collector = callocz(1, sizeof(struct _collector));
tmp_collector->hostname_hash = simple_hash(hostname);
tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
tmp_collector->hostname = strdupz(hostname);
tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
tmp_collector->next = collector_list->next;
collector_list->next = tmp_collector;
}
tmp_collector->count++;
debug(
D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
module_name ? module_name : "*", tmp_collector->count);
return tmp_collector;
}

View file

@ -1,41 +0,0 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// This is copied from Legacy ACLK, Original Author: amoss
// TODO unmess this
#ifndef ACLK_COLLECTOR_LIST_H
#define ACLK_COLLECTOR_LIST_H
#include "libnetdata/libnetdata.h"
extern netdata_mutex_t collector_mutex;
#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
* then a new metadata dataset must be send to the cloud
*
*/
struct _collector {
time_t created;
uint32_t count; //chart count
uint32_t hostname_hash;
uint32_t plugin_hash;
uint32_t module_hash;
char *hostname;
char *plugin_name;
char *module_name;
struct _collector *next;
};
extern struct _collector *collector_list;
struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
void _reset_collector_list();
void _free_collector(struct _collector *collector);
#endif /* ACLK_COLLECTOR_LIST_H */

View file

@ -814,11 +814,8 @@ int aclk_get_env(aclk_env_t *env, const char* aclk_hostname, int aclk_port) {
return 1;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json,proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
#else
buffer_sprintf(buf, "/api/v1/env?v=%s&cap=json&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
#endif
buffer_sprintf(buf, "/api/v1/env?v=%s&cap=proto&claim_id=%s", &(VERSION[1]) /* skip 'v' at beginning */, agent_id);
freez(agent_id);
req.host = (char*)aclk_hostname;

View file

@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
typedef struct aclk_query_handler {
aclk_query_type_t type;
char *name; // for logging purposes
int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query);
} aclk_query_handler;
static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
{
aclk_send_info_metadata(query_thr->client,
!query->data.metadata_info.initial_on_connect,
query->data.metadata_info.host);
return 0;
}
static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
{
aclk_send_alarm_metadata(query_thr->client,
!query->data.metadata_info.initial_on_connect);
return 0;
}
static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
{
usec_t t;
@ -277,84 +256,61 @@ cleanup:
return retval;
}
static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query)
{
aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name);
return 0;
}
static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query)
{
aclk_alarm_state_msg(query_thr->client, query->data.alarm_update);
// aclk_alarm_state_msg frees the json object including the header it generates
query->data.alarm_update = NULL;
return 0;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
return 0;
}
#endif
aclk_query_handler aclk_query_handlers[] = {
{ .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 },
{ .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query },
{ .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata },
{ .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata },
{ .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
{ .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
{ .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
{ .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
{ .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
{ .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
{ .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg },
{ .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg },
{ .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg },
{ .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg },
{ .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg },
#endif
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
const char *aclk_query_get_name(aclk_query_type_t qt)
{
aclk_query_handler *ptr = aclk_query_handlers;
while (ptr->type != UNKNOWN) {
if (ptr->type == qt)
return ptr->name;
ptr++;
switch (qt) {
case HTTP_API_V2: return "http_api_request_v2";
case REGISTER_NODE: return "register_node";
case NODE_STATE_UPDATE: return "node_state_update";
case CHART_DIMS_UPDATE: return "chart_and_dim_update";
case CHART_CONFIG_UPDATED: return "chart_config_updated";
case CHART_RESET: return "reset_chart_messages";
case RETENTION_UPDATED: return "update_retention_info";
case UPDATE_NODE_INFO: return "update_node_info";
case ALARM_LOG_HEALTH: return "alarm_log_health";
case ALARM_PROVIDE_CFG: return "provide_alarm_config";
case ALARM_SNAPSHOT: return "alarm_snapshot";
default:
error_report("Unknown query type used %d", (int) qt);
return "unknown";
}
return "unknown";
}
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
if (aclk_query_handlers[i].type == query->type) {
worker_is_busy(i);
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
aclk_query_free(query);
worker_is_idle();
return;
}
{
if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
error_report("Unknown query in query queue. %u", query->type);
aclk_query_free(query);
return;
}
fatal("Unknown query in query queue. %u", query->type);
worker_is_busy(query->type);
if (query->type == HTTP_API_V2) {
debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
http_api_v2(query_thr, query);
} else {
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
send_bin_msg(query_thr, query);
}
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[query_thr->idx]++;
aclk_metrics_per_sample.queries_per_type[query->type]++;
ACLK_STATS_UNLOCK;
}
aclk_query_free(query);
worker_is_idle();
}
/* Processes messages from queue. Compete for work with other threads
@ -370,8 +326,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
worker_register_job_name(i, aclk_query_handlers[i].name);
for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
worker_register_job_name(i, aclk_query_get_name(i));
}
}

View file

@ -111,15 +111,6 @@ void aclk_query_free(aclk_query_t query)
freez(query->data.http_api_v2.query);
break;
case CHART_NEW:
freez(query->data.chart_add_del.chart_name);
break;
case ALARM_STATE_UPDATE:
if (query->data.alarm_update)
json_object_put(query->data.alarm_update);
break;
case NODE_STATE_UPDATE:
case REGISTER_NODE:
case CHART_DIMS_UPDATE:

View file

@ -11,12 +11,7 @@
typedef enum {
UNKNOWN = 0,
METADATA_INFO,
METADATA_ALARMS,
HTTP_API_V2,
CHART_NEW,
CHART_DEL,
ALARM_STATE_UPDATE,
REGISTER_NODE,
NODE_STATE_UPDATE,
CHART_DIMS_UPDATE,
@ -30,16 +25,6 @@ typedef enum {
ACLK_QUERY_TYPE_COUNT // always keep this as last
} aclk_query_type_t;
struct aclk_query_metadata {
RRDHOST *host;
int initial_on_connect;
};
struct aclk_query_chart_add_del {
RRDHOST *host;
char* chart_name;
};
struct aclk_query_http_api_v2 {
char *payload;
char *query;
@ -73,12 +58,8 @@ struct aclk_query {
// TODO maybe remove?
int version;
union {
struct aclk_query_metadata metadata_info;
struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_query_chart_add_del chart_add_del;
struct aclk_bin_payload bin_payload;
json_object *alarm_update;
} data;
};

View file

@ -3,43 +3,9 @@
#include "libnetdata/libnetdata.h"
#ifdef ACLK_LEGACY
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
ACLK_CMD_ONCONNECT,
ACLK_CMD_INFO,
ACLK_CMD_CHART,
ACLK_CMD_CHARTDEL,
ACLK_CMD_ALARM,
ACLK_CMD_CLOUD_QUERY_2,
ACLK_CMD_CHILD_CONNECT,
ACLK_CMD_CHILD_DISCONNECT
} ACLK_CMD;
typedef enum aclk_metadata_state {
ACLK_METADATA_REQUIRED,
ACLK_METADATA_CMD_QUEUED,
ACLK_METADATA_SENT
} ACLK_METADATA_STATE;
#endif
typedef enum aclk_agent_state {
ACLK_HOST_INITIALIZING,
ACLK_HOST_STABLE
} ACLK_AGENT_STATE;
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
char *prev_claimed_id; // Claimed ID if changed (reclaimed) during runtime
#ifdef ACLK_LEGACY
// per child popcorning
ACLK_AGENT_STATE state;
ACLK_METADATA_STATE metadata;
time_t timestamp_created;
time_t t_last_popcorn_update;
#endif /* ACLK_LEGACY */
} aclk_rrdhost_state;
#endif /* ACLK_RRDHOST_STATE_H */

View file

@ -116,20 +116,8 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
return 0;
}
#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
ACLK_SHARED_STATE_UNLOCK;\
return 1;\
}\
ACLK_SHARED_STATE_UNLOCK;
static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
if (!aclk_use_new_cloud_arch) {
HTTP_CHECK_AGENT_INITIALIZED();
}
aclk_query_t query;
errno = 0;
@ -229,7 +217,6 @@ err_cleanup:
return 1;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
typedef uint32_t simple_hash_t;
typedef int(*rx_msg_handler)(const char *msg, size_t msg_len);
@ -524,4 +511,3 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
return;
}
}
#endif

View file

@ -10,10 +10,8 @@
int aclk_handle_cloud_cmd_message(char *payload);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
const char *rx_handler_get_name(size_t i);
unsigned int aclk_init_rx_msg_handlers(void);
void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len);
#endif
#endif /* ACLK_RX_MSGS_H */

View file

@ -8,11 +8,9 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
struct {
int query_thread_count;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
unsigned int proto_hdl_cnt;
uint32_t *aclk_proto_rx_msgs_sample;
RRDDIM **rx_msg_dims;
#endif
} aclk_stats_cfg; // there is only 1 stats thread at a time
// data ACLK stats need per query thread
@ -237,7 +235,6 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
const char *rx_handler_get_name(size_t i);
static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
{
@ -259,7 +256,6 @@ static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample)
rrdset_done(st);
}
#endif
static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
{
@ -290,31 +286,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
{
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
UNUSED(proto_hdl_cnt);
#endif
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt;
aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample));
aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*));
#endif
}
void aclk_stats_thread_cleanup()
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
freez(aclk_stats_cfg.rx_msg_dims);
freez(aclk_proto_rx_msgs_sample);
freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
#endif
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
@ -345,10 +333,10 @@ void *aclk_stats_main_thread(void *ptr)
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt);
#endif
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
@ -373,9 +361,7 @@ void *aclk_stats_main_thread(void *ptr)
struct mqtt_wss_stats mqtt_wss_stats = mqtt_wss_get_stats(args->client);
aclk_stats_mqtt_wss(&mqtt_wss_stats);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample);
#endif
}
return 0;

View file

@ -62,9 +62,7 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_q_process_max;
} aclk_metrics_per_sample;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
extern uint32_t *aclk_proto_rx_msgs_sample;
#endif
extern uint32_t *aclk_queries_per_thread;

View file

@ -13,29 +13,6 @@
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
const char *topic = aclk_get_topic(subtopic);
if (unlikely(!topic)) {
error("Couldn't get topic. Aborting message send");
return;
}
mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif
}
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
{
#ifndef ACLK_LOG_CONVERSATION_DIR
@ -71,30 +48,6 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
return packet_id;
}
static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
const char *topic = aclk_get_topic(subtopic);
if (unlikely(!topic)) {
error("Couldn't get topic. Aborting message send");
return 0;
}
mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif
return packet_id;
}
/* UNUSED now but can be used soon MVP1?
static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
{
@ -231,17 +184,6 @@ static struct json_object *create_hdr(const char *type, const char *msg_id, time
return obj;
}
static char *create_uuid()
{
uuid_t uuid;
char *uuid_str = mallocz(36 + 1);
uuid_generate(uuid);
uuid_unparse(uuid, uuid_str);
return uuid_str;
}
#ifndef __GNUC__
#pragma endregion
#endif
@ -250,90 +192,6 @@ static char *create_uuid()
#pragma region aclk_tx_msgs message generators
#endif
/*
* This will send the /api/v1/info
*/
#define BUFFER_INITIAL_SIZE (1024 * 16)
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
json_object *msg, *payload, *tmp;
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
else
msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
web_client_api_request_v1_info_fill_buffer(host, local_buffer);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "info", tmp);
buffer_flush(local_buffer);
charts2json(host, local_buffer, 1, 0);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "charts", tmp);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
json_object_put(msg);
freez(msg_id);
buffer_free(local_buffer);
}
// TODO should include header instead
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
{
BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
json_object *msg, *payload, *tmp;
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
else
msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
health_alarms2json(localhost, local_buffer, 1);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "configured-alarms", tmp);
buffer_flush(local_buffer);
health_active_log_alarms_2json(localhost, local_buffer);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "alarms-active", tmp);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
json_object_put(msg);
freez(msg_id);
buffer_free(local_buffer);
}
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@ -384,80 +242,6 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
}
}
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
{
json_object *msg, *payload;
BUFFER *tmp_buffer;
RRDSET *st;
st = rrdset_find(host, chart);
if (!st)
st = rrdset_find_byname(host, chart);
if (!st) {
info("FAILED to find chart %s", chart);
return;
}
tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
rrdset2json(st, tmp_buffer, NULL, NULL, 1);
payload = json_tokener_parse(tmp_buffer->buffer);
if (!payload) {
error("Failed to parse JSON from rrdset2json");
buffer_free(tmp_buffer);
return;
}
msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(msg, "payload", payload);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
buffer_free(tmp_buffer);
json_object_put(msg);
}
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
{
// we create header here on purpose (and not send message with it already as `msg` param)
// timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
// send message with timestamps already to Query Queue they would be incorrect at time
// when query queue would get to send them)
json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
json_object_object_add(obj, "payload", msg);
aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
json_object_put(obj);
}
/*
* Will generate disconnect message.
* @param message if NULL it will generate LWT message (unexpected).
* Otherwise string pointed to by this parameter will be used as
* reason.
*/
json_object *aclk_generate_disconnect(const char *message)
{
json_object *tmp, *msg;
msg = create_hdr("disconnect", NULL, 0, 0, 2);
tmp = json_object_new_string(message ? message : "unexpected");
json_object_object_add(msg, "payload", tmp);
return msg;
}
int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
{
int pid;
json_object *msg = aclk_generate_disconnect(message);
pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
json_object_put(msg);
return pid;
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
size_t len;
uint16_t pid;
@ -532,7 +316,6 @@ char *aclk_generate_lwt(size_t *size) {
return msg;
}
#endif /* ENABLE_NEW_CLOUD_PROTOCOL */
#ifndef __GNUC__
#pragma endregion

View file

@ -11,23 +11,10 @@
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg);
json_object *aclk_generate_disconnect(const char *message);
int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
// new protobuf msgs
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);
#endif
#endif

View file

@ -4,7 +4,6 @@
#include "daemon/common.h"
int aclk_use_new_cloud_arch = 0;
usec_t aclk_session_newarch = 0;
aclk_env_t *aclk_env = NULL;
@ -127,19 +126,11 @@ struct topic_name {
{ .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
};
enum aclk_topics compulsory_topics_legacy[] = {
ACLK_TOPICID_CHART,
ACLK_TOPICID_ALARMS,
ACLK_TOPICID_METADATA,
ACLK_TOPICID_COMMAND,
ACLK_TOPICID_UNKNOWN
};
enum aclk_topics compulsory_topics_new_cloud_arch[] = {
enum aclk_topics compulsory_topics[] = {
// TODO remove old topics once not needed anymore
ACLK_TOPICID_CHART,
ACLK_TOPICID_ALARMS,
ACLK_TOPICID_METADATA,
ACLK_TOPICID_CHART, //TODO from legacy
ACLK_TOPICID_ALARMS, //TODO from legacy
ACLK_TOPICID_METADATA, //TODO from legacy
ACLK_TOPICID_COMMAND,
ACLK_TOPICID_AGENT_CONN,
ACLK_TOPICID_CMD_NG_V1,
@ -279,8 +270,6 @@ int aclk_generate_topic_cache(struct json_object *json)
}
}
enum aclk_topics *compulsory_topics = aclk_use_new_cloud_arch ? compulsory_topics_new_cloud_arch : compulsory_topics_legacy;
for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
if (!aclk_get_topic(compulsory_topics[i])) {
error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));

View file

@ -20,7 +20,6 @@
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
extern int aclk_use_new_cloud_arch;
extern usec_t aclk_session_newarch;
extern int chart_batch_id;

View file

@ -218,14 +218,6 @@ AC_ARG_ENABLE(
[ enable_cloud="detect" ]
)
AC_ARG_WITH(
[new-cloud-protocol],
[AS_HELP_STRING([--with-new-cloud-protocol],
[Requires New Cloud Protocol support to be built])],
[new_cloud_protocol="$withval"],
[new_cloud_protocol="detect"]
)
if test "${enable_cloud}" = "no"; then
AC_DEFINE([DISABLE_CLOUD], [1], [disable netdata cloud functionality])
fi
@ -768,16 +760,16 @@ AC_MSG_CHECKING([if Cloud functionality should be enabled])
AC_MSG_RESULT([${enable_cloud}])
if test "$enable_cloud" != "no"; then
AC_MSG_NOTICE([Checking if ACLK Next Generation can be built])
AC_MSG_NOTICE([Checking if ACLK can be built])
can_enable_ng="yes"
AC_MSG_CHECKING([if git submodules present for ACLK Next Generation])
AC_MSG_CHECKING([if git submodules present for ACLK])
if test -f "mqtt_websockets/src/mqtt_wss_client.c"; then
AC_MSG_RESULT([yes])
else
AC_MSG_RESULT([no])
can_enable_ng="no"
fi
AC_MSG_CHECKING([if SSL available for ACLK Next Generation])
AC_MSG_CHECKING([if SSL available for ACLK])
if test -n "${SSL_LIBS}"; then
AC_MSG_RESULT([yes])
OPTIONAL_SSL_CFLAGS="${SSL_CFLAGS}"
@ -785,7 +777,7 @@ if test "$enable_cloud" != "no"; then
else
AC_MSG_RESULT([no])
fi
AC_MSG_CHECKING([if JSON-C available for ACLK Next Generation])
AC_MSG_CHECKING([if JSON-C available for ACLK])
if test "$enable_jsonc" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
@ -793,6 +785,28 @@ if test "$enable_cloud" != "no"; then
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([if protobuf available for ACLK New Cloud Protocol])
if test "${have_libprotobuf}" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([if protoc available for ACLK New Cloud Protocol])
if test "${have_protoc}" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([if C++ compiler available for ACLK New Cloud Protocol])
if test "${have_CXX_compiler}" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([ACLK Next Generation can be built])
AC_MSG_RESULT([${can_enable_ng}])
if test "$can_enable_ng" = "no" -a "$enable_cloud" = "yes"; then
@ -801,46 +815,10 @@ if test "$enable_cloud" != "no"; then
if test "$can_enable_ng" = "yes"; then
enable_aclk="yes"
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
OPTIONAL_ACLK_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include"
fi
if test "$new_cloud_protocol" != "no"; then
can_build_new_cloud_protocol="yes"
AC_MSG_CHECKING([if protobuf available for New Cloud Protocol])
if test "${have_libprotobuf}" != "yes"; then
AC_MSG_RESULT([no])
can_build_new_cloud_protocol="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([if protoc available for New Cloud Protocol])
if test "${have_protoc}" != "yes"; then
AC_MSG_RESULT([no])
can_build_new_cloud_protocol="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([if C++ compiler available for New Cloud Protocol])
if test "${have_CXX_compiler}" != "yes"; then
AC_MSG_RESULT([no])
can_build_new_cloud_protocol="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([ACLK Next Generation can support New Cloud protocol])
AC_MSG_RESULT([${can_build_new_cloud_protocol}])
if test "$new_cloud_protocol" = "yes" -a "$can_build_new_cloud_protocol" != "yes"; then
AC_MSG_ERROR([Requested new cloud protocol support but it can't be build])
fi
if test "$can_build_new_cloud_protocol" = "yes"; then
new_cloud_protocol="yes"
AC_DEFINE([ENABLE_NEW_CLOUD_PROTOCOL], [1], [New protobuf based Netdata Cloud Protocol Support])
OPTIONAL_ACLK_CFLAGS="${OPTIONAL_ACLK_CFLAGS} -I \$(abs_top_srcdir)/aclk/aclk-schemas"
OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}"
CXX11FLAG="-std=c++11"
OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}"
fi
OPTIONAL_ACLK_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include -I \$(abs_top_srcdir)/aclk/aclk-schemas"
OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}"
CXX11FLAG="-std=c++11"
OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}"
fi
fi
@ -851,7 +829,6 @@ fi
AC_SUBST([enable_cloud])
AC_SUBST([enable_aclk])
AM_CONDITIONAL([ENABLE_ACLK], [test "${enable_aclk}" = "yes"])
AM_CONDITIONAL([ENABLE_NEW_CLOUD_PROTOCOL], [test "${can_build_new_cloud_protocol}" = "yes"])
# -----------------------------------------------------------------------------
# apps.plugin

View file

@ -7,7 +7,7 @@ struct analytics_data analytics_data;
extern void analytics_exporting_connectors (BUFFER *b);
extern void analytics_exporting_connectors_ssl (BUFFER *b);
extern void analytics_build_info (BUFFER *b);
extern int aclk_connected, aclk_use_new_cloud_arch;
extern int aclk_connected;
struct collector {
char *plugin;
@ -499,12 +499,7 @@ void analytics_aclk(void)
#ifdef ENABLE_ACLK
if (aclk_connected) {
analytics_set_data(&analytics_data.netdata_host_aclk_available, "true");
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch)
analytics_set_data_str(&analytics_data.netdata_host_aclk_protocol, "New");
else
#endif
analytics_set_data_str(&analytics_data.netdata_host_aclk_protocol, "Legacy");
analytics_set_data_str(&analytics_data.netdata_host_aclk_protocol, "New");
}
else
#endif

View file

@ -20,12 +20,6 @@
#endif
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#define NEW_CLOUD_PROTO 1
#else
#define NEW_CLOUD_PROTO 0
#endif
#ifdef ENABLE_DBENGINE
#define FEAT_DBENGINE 1
#else
@ -273,7 +267,7 @@ void print_build_info(void) {
printf(" Native HTTPS: %s\n", FEAT_YES_NO(FEAT_NATIVE_HTTPS));
printf(" Netdata Cloud: %s %s\n", FEAT_YES_NO(FEAT_CLOUD), FEAT_CLOUD_MSG);
printf(" ACLK Next Generation: %s\n", FEAT_YES_NO(FEAT_CLOUD));
printf(" ACLK-NG New Cloud Protocol: %s\n", FEAT_YES_NO(NEW_CLOUD_PROTO));
printf(" ACLK-NG New Cloud Protocol: %s\n", FEAT_YES_NO(1));
printf(" ACLK Legacy: %s\n", FEAT_YES_NO(0));
printf(" TLS Host Verification: %s\n", FEAT_YES_NO(FEAT_TLS_HOST_VERIFY));
printf(" Machine Learning: %s\n", FEAT_YES_NO(FEAT_ML));
@ -325,7 +319,7 @@ void print_build_info_json(void) {
printf(" \"cloud-disabled\": false,\n");
#endif
printf(" \"aclk-ng\": %s,\n", FEAT_JSON_BOOL(FEAT_CLOUD));
printf(" \"aclk-ng-new-cloud-proto\": %s,\n", FEAT_JSON_BOOL(NEW_CLOUD_PROTO));
printf(" \"aclk-ng-new-cloud-proto\": %s,\n", FEAT_JSON_BOOL(1));
printf(" \"aclk-legacy\": %s,\n", FEAT_JSON_BOOL(0));
printf(" \"tls-host-verify\": %s,\n", FEAT_JSON_BOOL(FEAT_TLS_HOST_VERIFY));
@ -383,10 +377,7 @@ void analytics_build_info(BUFFER *b) {
add_to_bi(b, "Native HTTPS");
#endif
#ifdef ENABLE_ACLK
add_to_bi(b, "Netdata Cloud|ACLK Next Generation");
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
add_to_bi(b, "New Cloud Protocol Support");
add_to_bi(b, "Netdata Cloud|ACLK Next Generation|New Cloud Protocol Support");
#endif
#if (FEAT_TLS_HOST_VERIFY!=0)
add_to_bi(b, "TLS Host Verification");

View file

@ -47,7 +47,7 @@ void netdata_cleanup_and_exit(int ret) {
// stop everything
info("EXIT: stopping static threads...");
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
aclk_sync_exit_all();
#endif
cancel_main_threads();

View file

@ -123,7 +123,7 @@ const struct netdata_static_thread static_threads_common[] = {
.start_routine = socket_listen_main_static_threaded
},
#if defined(ENABLE_ACLK) || defined(ACLK_NG)
#ifdef ENABLE_ACLK
{
.name = "ACLK_Main",
.config_section = NULL,

View file

@ -724,10 +724,6 @@ struct rrdhost {
const char *tags; // tags for this host
const char *timezone; // the timezone of the host
#ifdef ENABLE_ACLK
long deleted_charts_count;
#endif
const char *abbrev_timezone; // the abbriviated timezone of the host
int32_t utc_offset; // the offset in seconds from utc
@ -1263,7 +1259,7 @@ extern void rrddim_isnot_obsolete(RRDSET *st, RRDDIM *rd);
extern collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number value);
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
extern time_t calc_dimension_liveness(RRDDIM *rd, time_t now);
#endif
extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
@ -1301,8 +1297,7 @@ extern RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st);
extern void rrdset_free(RRDSET *st);
extern void rrdset_reset(RRDSET *st);
extern void rrdset_save(RRDSET *st);
#define rrdset_delete(st) rrdset_delete_custom(st, 0)
extern void rrdset_delete_custom(RRDSET *st, int db_rotated);
extern void rrdset_delete(RRDSET *st);
extern void rrdset_delete_obsolete_dimensions(RRDSET *st);
extern RRDHOST *rrdhost_create(

View file

@ -139,7 +139,7 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) {
// 0 : Dimension is live
// last collected time : Dimension is not live
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
time_t calc_dimension_liveness(RRDDIM *rd, time_t now)
{
time_t last_updated = rd->last_collected_time.tv_sec;
@ -184,7 +184,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
debug(D_METADATALOG, "DIMENSION [%s] metadata updated", rd->id);
(void)sql_store_dimension(&rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
rd->algorithm);
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, now_realtime_sec()));
#endif
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
@ -427,7 +427,7 @@ void rrddim_free(RRDSET *st, RRDDIM *rd)
error("RRDDIM: INTERNAL ERROR: attempt to remove from index dimension '%s' on chart '%s', removed a different dimension.", rd->id, st->id);
// free(rd->annotations);
//#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
//#ifdef ENABLE_ACLK
// if (!netdata_exit)
// aclk_send_dimension_update(rd);
//#endif

View file

@ -886,7 +886,7 @@ void rrdhost_free(RRDHOST *host) {
rrdhost_wrlock(host); // lock this RRDHOST
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
struct aclk_database_worker_config *wc = host->dbsync_worker;
if (wc && !netdata_exit) {
struct aclk_database_cmd cmd;
@ -999,7 +999,7 @@ void rrdhost_free(RRDHOST *host) {
freez(host->node_id);
freez(host);
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
if (wc)
wc->is_orphan = 0;
#endif
@ -1330,7 +1330,7 @@ restart_after_removal:
}
continue;
}
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
else
queue_dimension_to_aclk(rd, rd->last_collected_time.tv_sec);
#endif
@ -1363,7 +1363,7 @@ restart_after_removal:
rrdset_free(st);
goto restart_after_removal;
}
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
else
sql_check_chart_liveness(st);
#endif
@ -1391,14 +1391,7 @@ void rrd_cleanup_obsolete_charts()
{
if (host->obsolete_charts_count) {
rrdhost_wrlock(host);
#ifdef ENABLE_ACLK
host->deleted_charts_count = 0;
#endif
rrdhost_cleanup_obsolete_charts(host);
#ifdef ENABLE_ACLK
if (host->deleted_charts_count)
aclk_update_chart(host, "dummy-chart", 0);
#endif
rrdhost_unlock(host);
}

View file

@ -441,11 +441,8 @@ void rrdset_save(RRDSET *st) {
}
}
void rrdset_delete_custom(RRDSET *st, int db_rotated) {
void rrdset_delete(RRDSET *st) {
RRDDIM *rd;
#ifndef ENABLE_ACLK
UNUSED(db_rotated);
#endif
rrdset_check_rdlock(st);
info("Deleting chart '%s' ('%s') from disk...", st->id, st->name);
@ -465,13 +462,6 @@ void rrdset_delete_custom(RRDSET *st, int db_rotated) {
}
recursively_delete_dir(st->cache_dir, "left-over chart");
#ifdef ENABLE_ACLK
if ((netdata_cloud_setting) && (db_rotated || RRD_MEMORY_MODE_DBENGINE != st->rrd_memory_mode)) {
aclk_del_collector(st->rrdhost, st->plugin_name, st->module_name);
st->rrdhost->deleted_charts_count++;
}
#endif
}
void rrdset_delete_obsolete_dimensions(RRDSET *st) {
@ -659,22 +649,7 @@ RRDSET *rrdset_create_custom(
}
if (mark_rebuild) {
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
if (mark_rebuild & META_CHART_ACTIVATED) {
aclk_add_collector(host, st->plugin_name, st->module_name);
}
else {
if (mark_rebuild & (META_PLUGIN_UPDATED | META_MODULE_UPDATED)) {
aclk_del_collector(
host, mark_rebuild & META_PLUGIN_UPDATED ? old_plugin : st->plugin_name,
mark_rebuild & META_MODULE_UPDATED ? old_module : st->module_name);
aclk_add_collector(host, st->plugin_name, st->module_name);
}
}
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
}
#endif
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
freez(old_plugin);
freez(old_module);
freez(old_title);
@ -929,11 +904,6 @@ RRDSET *rrdset_create_custom(
compute_chart_hash(st);
rrdhost_unlock(host);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting)
aclk_add_collector(host, plugin, module);
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return(st);
}
@ -1769,14 +1739,14 @@ after_first_database_work:
after_second_database_work:
st->last_collected_total = st->collected_total;
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
time_t mark = now_realtime_sec();
#endif
rrddim_foreach_read(rd, st) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
if (likely(!st->state->is_ar_chart)) {
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK)))
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
@ -1881,7 +1851,7 @@ after_second_database_work:
delete_dimension_uuid(&rd->state->metric_uuid);
} else {
/* Do not delete this dimension */
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark));
#endif
last = rd;

View file

@ -6,7 +6,7 @@
#include "sqlite_aclk_chart.h"
#include "sqlite_aclk_node.h"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
#include "../../aclk/aclk.h"
#endif
@ -36,7 +36,7 @@ uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
int retention_running = 0;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
static void stop_retention_run()
{
uv_mutex_lock(&aclk_async_lock);
@ -276,7 +276,7 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
void sql_aclk_sync_init(void)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
char *err_msg = NULL;
int rc;
@ -323,7 +323,7 @@ static void timer_cb(uv_timer_t* handle)
uv_stop(handle->loop);
uv_update_time(handle->loop);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
struct aclk_database_worker_config *wc = handle->data;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
@ -338,7 +338,7 @@ static void timer_cb(uv_timer_t* handle)
wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
if (aclk_use_new_cloud_arch && aclk_connected) {
if (aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
@ -373,7 +373,7 @@ static void timer_cb(uv_timer_t* handle)
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
void after_send_retention(uv_work_t *req, int status)
{
struct aclk_database_worker_config *wc = req->data;
@ -410,7 +410,6 @@ void aclk_database_worker(void *arg)
{
worker_register("ACLKSYNC");
worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
worker_register_job_name(ACLK_DATABASE_ADD_CHART, "chart add");
worker_register_job_name(ACLK_DATABASE_ADD_DIMENSION, "dimension add");
worker_register_job_name(ACLK_DATABASE_PUSH_CHART, "chart push");
@ -420,7 +419,6 @@ void aclk_database_worker(void *arg)
worker_register_job_name(ACLK_DATABASE_UPD_RETENTION, "retention check");
worker_register_job_name(ACLK_DATABASE_DIM_DELETION, "dimension delete");
worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
#endif
worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
@ -479,7 +477,7 @@ void aclk_database_worker(void *arg)
info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc));
memset(&cmd, 0, sizeof(cmd));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
@ -532,7 +530,7 @@ void aclk_database_worker(void *arg)
break;
// CHART / DIMENSION OPERATIONS
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
case ACLK_DATABASE_ADD_CHART:
debug(D_ACLK_SYNC, "Adding chart event for %s", wc->host_guid);
aclk_add_chart_event(wc, cmd);
@ -585,7 +583,7 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
sql_build_node_info(wc, cmd);
break;
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
case ACLK_DATABASE_DIM_DELETION:
debug(D_ACLK_SYNC,"Sending dimension deletion information %s", wc->uuid_str);
aclk_process_dimension_deletion(wc, cmd);
@ -918,9 +916,9 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
void aclk_data_rotated(void)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
if (!aclk_use_new_cloud_arch || !aclk_connected)
if (!aclk_connected)
return;
time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY;

View file

@ -115,7 +115,6 @@ static inline char *get_str_from_uuid(uuid_t *uuid)
enum aclk_database_opcode {
ACLK_DATABASE_NOOP = 0,
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
ACLK_DATABASE_ADD_CHART,
ACLK_DATABASE_ADD_DIMENSION,
ACLK_DATABASE_PUSH_CHART,
@ -125,7 +124,6 @@ enum aclk_database_opcode {
ACLK_DATABASE_UPD_RETENTION,
ACLK_DATABASE_DIM_DELETION,
ACLK_DATABASE_ORPHAN_HOST,
#endif
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST,

View file

@ -3,7 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_alert.h"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
#include "../../aclk/aclk_alarm_api.h"
#include "../../aclk/aclk.h"
#endif
@ -123,21 +123,6 @@ done:
// and handle both cases
int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
{
//check aclk architecture and handle old json alarm update to cloud
//include also the valid statuses for this case
#ifdef ENABLE_ACLK
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!aclk_use_new_cloud_arch && aclk_connected) {
#endif
if ((ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) ||
((ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) {
aclk_update_alarm(host, ae);
}
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
}
if (!claimed())
return 0;
@ -196,17 +181,11 @@ bind_fail:
buffer_free(sql);
return 0;
#else
UNUSED(host);
UNUSED(ae);
UNUSED(skip_filter);
#endif
return 0;
}
int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
switch(status) {
case RRDCALC_STATUS_REMOVED:
return ALARM_STATUS_REMOVED;
@ -234,7 +213,7 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifndef ENABLE_ACLK
UNUSED(wc);
UNUSED(cmd);
#else
@ -470,7 +449,7 @@ void aclk_send_alarm_health_log(char *node_id)
void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifndef ENABLE_ACLK
UNUSED(wc);
#else
int rc;
@ -595,7 +574,7 @@ void aclk_send_alarm_configuration(char *config_hash)
int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(wc);
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifndef ENABLE_ACLK
UNUSED(cmd);
#else
int rc = 0;
@ -708,7 +687,6 @@ bind_fail:
// Start streaming alerts
void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
@ -749,20 +727,13 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
else
log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
UNUSED(start_seq_id);
UNUSED(batch_id);
#endif
return;
}
void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(cmd);
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
UNUSED(wc);
#else
BUFFER *sql = buffer_create(1024);
buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
@ -778,13 +749,11 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
buffer_free(sql);
wc->pause_alert_updates = 0;
#endif
return;
}
void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!host->dbsync_worker))
return;
@ -798,15 +767,11 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
cmd.data_param = NULL;
cmd.completion = NULL;
aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
#else
UNUSED(host);
#endif
}
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
{
UNUSED(claim_id);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (unlikely(!node_id))
return;
@ -843,11 +808,7 @@ void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t sn
aclk_database_enq_cmd(wc, &cmd);
} else
log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
#else
UNUSED(node_id);
UNUSED(snapshot_id);
UNUSED(sequence_id);
#endif
return;
}
@ -867,7 +828,7 @@ void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
buffer_free(sql);
}
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
{
char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN");
@ -917,7 +878,7 @@ void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_EN
}
#endif
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
{
ALARM_ENTRY *ae = host->health_log.alarms;
@ -936,7 +897,7 @@ static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
#define ALARM_EVENTS_PER_CHUNK 10
void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifndef ENABLE_ACLK
UNUSED(wc);
UNUSED(cmd);
#else
@ -1055,7 +1016,6 @@ void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, stru
void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
{
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (!claimed())
return;
@ -1075,9 +1035,6 @@ void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
sqlite3_free(err_msg);
}
buffer_free(sql);
#else
UNUSED(host);
#endif
}
int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)

View file

@ -3,7 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_chart.h"
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
#include "../../aclk/aclk_charts_api.h"
#include "../../aclk/aclk.h"
@ -212,7 +212,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
int rc = 0;
sqlite3_stmt *res = NULL;
if (!aclk_use_new_cloud_arch || !aclk_connected)
if (!aclk_connected)
return;
if (unlikely(!db_meta))
@ -843,7 +843,7 @@ void aclk_update_retention(struct aclk_database_worker_config *wc)
{
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
if (!aclk_connected)
return;
char *claim_id = is_agent_claimed();
@ -1146,9 +1146,6 @@ void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated)
void aclk_send_dimension_update(RRDDIM *rd)
{
if (!aclk_use_new_cloud_arch)
return;
char *claim_id = is_agent_claimed();
if (unlikely(!claim_id))
return;
@ -1316,24 +1313,11 @@ void sql_check_chart_liveness(RRDSET *st) {
rrdset_unlock(st);
}
#endif //ENABLE_NEW_CLOUD_PROTOCOL
// ST is read locked
int queue_chart_to_aclk(RRDSET *st)
{
#ifndef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
aclk_update_chart(st->rrdhost, st->id, 1);
#else
UNUSED(st);
#endif
return 0;
#else
if (!aclk_use_new_cloud_arch && aclk_connected) {
aclk_update_chart(st->rrdhost, st->id, 1);
return 0;
}
return sql_queue_chart_payload((struct aclk_database_worker_config *) st->rrdhost->dbsync_worker,
st, ACLK_DATABASE_ADD_CHART);
#endif
}
#endif //ENABLE_ACLK

View file

@ -3,7 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_node.h"
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
#include "../../aclk/aclk_charts_api.h"
#endif
@ -11,7 +11,7 @@ void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_dat
{
UNUSED(cmd);
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
#ifdef ENABLE_ACLK
struct update_node_info node_info;
if (!wc->host)

View file

@ -219,10 +219,6 @@ static void health_reload_host(RRDHOST *host) {
* Reload the host configuration for all hosts.
*/
void health_reload(void) {
#ifdef ENABLE_ACLK
if (netdata_cloud_setting)
aclk_single_update_disable();
#endif
sql_refresh_hashes();
rrd_rdlock();
@ -234,11 +230,7 @@ void health_reload(void) {
rrd_unlock();
#ifdef ENABLE_ACLK
if (netdata_cloud_setting) {
aclk_single_update_enable();
aclk_alarm_reload();
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
aclk_alert_reloaded = 1;
#endif
}
#endif
}
@ -736,7 +728,7 @@ void *health_main(void *ptr) {
rrdcalc_labels_unlink();
unsigned int loop = 0;
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
unsigned int marked_aclk_reload_loop = 0;
#endif
while(!netdata_exit) {
@ -765,7 +757,7 @@ void *health_main(void *ptr) {
}
}
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
if (aclk_alert_reloaded && !marked_aclk_reload_loop)
marked_aclk_reload_loop = loop;
#endif
@ -828,7 +820,7 @@ void *health_main(void *ptr) {
rc->last_status_change = now;
rc->last_updated = now;
rc->value = NAN;
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
if (netdata_cloud_setting && likely(!aclk_alert_reloaded))
sql_queue_alarm_to_aclk(host, ae, 1);
#endif
@ -1180,7 +1172,7 @@ void *health_main(void *ptr) {
health_alarm_wait_for_execution(ae);
}
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
if (netdata_cloud_setting && unlikely(aclk_alert_reloaded) && loop > (marked_aclk_reload_loop + 2)) {
rrdhost_foreach_read(host) {
if (unlikely(!host->health_enabled))

View file

@ -699,7 +699,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)
@ -713,7 +713,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)

View file

@ -3,7 +3,6 @@
#include "web_api_v1.h"
char *api_secret;
extern int aclk_use_new_cloud_arch;
static struct {
const char *name;
@ -1062,11 +1061,7 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#ifdef ENABLE_ACLK
buffer_strcat(wb, "\t\"cloud-available\": true,\n");
buffer_strcat(wb, "\t\"aclk-ng-available\": true,\n");
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
buffer_strcat(wb, "\t\"aclk-ng-new-cloud-protocol\": true,\n");
#else
buffer_strcat(wb, "\t\"aclk-ng-new-cloud-protocol\": false,\n");
#endif
buffer_strcat(wb, "\t\"aclk-legacy-available\": false,\n");
buffer_strcat(wb, "\t\"aclk-implementation\": \"Next Generation\",\n");
#else
@ -1084,12 +1079,7 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#ifdef ENABLE_ACLK
if (aclk_connected) {
buffer_strcat(wb, "\t\"aclk-available\": true,\n");
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
if (aclk_use_new_cloud_arch)
buffer_strcat(wb, "\t\"aclk-available-protocol\": \"New\",\n");
else
#endif
buffer_strcat(wb, "\t\"aclk-available-protocol\": \"Legacy\",\n");
buffer_strcat(wb, "\t\"aclk-available-protocol\": \"New\",\n");
}
else
#endif