0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-14 17:48:37 +00:00
netdata_netdata/aclk/agent_cloud_link.c
Andrew Moss fe722cb2a4
Improve the behavior of claiming ()
The default cloud url has been updated to app.netdata.cloud ready for the release. The claiming process now checks the current user executing claiming and refuses to perform the claim for the wrong user. If the current UID is 0 then claiming proceeds but the file ownership is adjusted to be the correct netdata user. The default expected user is `netdata` unless the script can identify the user from the current configuration. After the claiming script is executed the CLI is used to reload the claiming state.
2020-03-31 13:07:24 +02:00

1877 lines
55 KiB
C

// SPDX-License-Identifier: GPL-3.0-or-later
#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"
#include "aclk_lws_https_client.h"
#include "aclk_common.h"
int aclk_shutting_down = 0;
// State-machine for the on-connect metadata transmission.
// TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
// agent startup phase.
static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
static AGENT_STATE agent_state = AGENT_INITIALIZING;
// Other global state
static int aclk_subscribed = 0;
static int aclk_disable_single_updates = 0;
static time_t last_init_sequence = 0;
static int waiting_init = 1;
static char *aclk_username = NULL;
static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_connected = 0; // Exposed in the web-api
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
* then a new metadata dataset must be send to the cloud
*
*/
struct _collector {
time_t created;
uint32_t count; //chart count
uint32_t hostname_hash;
uint32_t plugin_hash;
uint32_t module_hash;
char *hostname;
char *plugin_name;
char *module_name;
struct _collector *next;
};
struct _collector *collector_list = NULL;
struct aclk_query {
time_t created;
time_t run_after; // Delay run until after this time
ACLK_CMD cmd; // What command is this
char *topic; // Topic to respond to
char *data; // Internal data (NULL if request from the cloud)
char *msg_id; // msg_id generated by the cloud (NULL if internal)
char *query; // The actual query
u_char deleted; // Mark deleted for garbage collect
struct aclk_query *next;
};
struct aclk_query_queue {
struct aclk_query *aclk_query_head;
struct aclk_query *aclk_query_tail;
uint64_t count;
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
char *create_uuid()
{
uuid_t uuid;
char *uuid_str = mallocz(36 + 1);
uuid_generate(uuid);
uuid_unparse(uuid, uuid_str);
return uuid_str;
}
int cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
switch (e->type) {
case JSON_OBJECT:
case JSON_ARRAY:
break;
case JSON_STRING:
if (!strcmp(e->name, "msg-id")) {
data->msg_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "type")) {
data->type_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "callback-topic")) {
data->callback_topic = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "payload")) {
if (likely(e->data.string)) {
size_t len = strlen(e->data.string);
data->payload = mallocz(len+1);
if (!url_decode_r(data->payload, e->data.string, len + 1))
strcpy(data->payload, e->data.string);
}
break;
}
break;
case JSON_NUMBER:
if (!strcmp(e->name, "version")) {
data->version = atoi(e->original_string);
break;
}
break;
case JSON_BOOLEAN:
break;
case JSON_NULL:
break;
}
return 0;
}
static RSA *aclk_private_key = NULL;
static int create_private_key()
{
char filename[FILENAME_MAX + 1]; struct stat statbuf;
snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir);
if (lstat(filename, &statbuf) != 0) {
error("Claimed agent cannot establish ACLK - private key not found '%s' failed.", filename);
return 1;
}
if (unlikely(statbuf.st_size == 0)) {
info("Claimed agent cannot establish ACLK - private key '%s' is empty.", filename);
return 1;
}
FILE *f = fopen(filename, "rt");
if (unlikely(f == NULL)) {
error("Claimed agent cannot establish ACLK - unable to open private key '%s'.", filename);
return 1;
}
char *private_key = callocz(1, statbuf.st_size + 1);
size_t bytes_read = fread(private_key, 1, statbuf.st_size, f);
private_key[bytes_read] = 0;
debug(D_ACLK, "Claimed agent loaded private key len=%zu bytes", bytes_read);
fclose(f);
BIO *key_bio = BIO_new_mem_buf(private_key, -1);
if (key_bio==NULL) {
error("Claimed agent cannot establish ACLK - failed to create BIO for key");
goto biofailed;
}
aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
BIO_free(key_bio);
if (aclk_private_key!=NULL)
{
freez(private_key);
return 0;
}
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
biofailed:
freez(private_key);
return 1;
}
/*
* After a connection failure -- delay in milliseconds
* When a connection is established, the delay function
* should be called with
*
* mode 0 to reset the delay
* mode 1 to sleep for the calculated amount of time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
*
*/
unsigned long int aclk_reconnect_delay(int mode)
{
static int fail = -1;
unsigned long int delay;
if (!mode || fail == -1) {
srandom(time(NULL));
fail = mode - 1;
return 0;
}
delay = (1 << fail);
if (delay >= ACLK_MAX_BACKOFF_DELAY) {
delay = ACLK_MAX_BACKOFF_DELAY * 1000;
} else {
fail++;
delay = (delay * 1000) + (random() % 1000);
}
// sleep_usec(USEC_PER_MS * delay);
return delay;
}
/*
* Free a query structure when done
*/
void aclk_query_free(struct aclk_query *this_query)
{
if (unlikely(!this_query))
return;
freez(this_query->topic);
if (likely(this_query->query))
freez(this_query->query);
if (likely(this_query->data))
freez(this_query->data);
if (likely(this_query->msg_id))
freez(this_query->msg_id);
freez(this_query);
}
// Returns the entry after which we need to create a new entry to run at the specified time
// If NULL is returned we need to add to HEAD
// Need to have a QUERY lock before calling this
struct aclk_query *aclk_query_find_position(time_t time_to_run)
{
struct aclk_query *tmp_query, *last_query;
// Quick check if we will add to the end
if (likely(aclk_queue.aclk_query_tail)) {
if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
return aclk_queue.aclk_query_tail;
}
last_query = NULL;
tmp_query = aclk_queue.aclk_query_head;
while (tmp_query) {
if (tmp_query->run_after > time_to_run)
return last_query;
last_query = tmp_query;
tmp_query = tmp_query->next;
}
return last_query;
}
// Need to have a QUERY lock before calling this
struct aclk_query *
aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
{
struct aclk_query *tmp_query, *prev_query;
UNUSED(cmd);
tmp_query = aclk_queue.aclk_query_head;
prev_query = NULL;
while (tmp_query) {
if (likely(!tmp_query->deleted)) {
if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
(!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
if (likely(last_query))
*last_query = prev_query;
return tmp_query;
}
}
}
prev_query = tmp_query;
tmp_query = tmp_query->next;
}
return NULL;
}
/*
* Add a query to execute, the result will be send to the specified topic
*/
int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
// Ignore all commands while we wait for the agent to initialize
if (unlikely(waiting_init))
return 1;
run_after = now_realtime_sec() + run_after;
QUERY_LOCK;
struct aclk_query *last_query = NULL;
tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 1;
}
if (last_query)
last_query->next = tmp_query->next;
else
aclk_queue.aclk_query_head = tmp_query->next;
debug(D_ACLK, "Removing double entry");
aclk_query_free(tmp_query);
aclk_queue.count--;
}
new_query = callocz(1, sizeof(struct aclk_query));
new_query->cmd = aclk_cmd;
if (internal) {
new_query->topic = strdupz(topic);
if (likely(query))
new_query->query = strdupz(query);
} else {
new_query->topic = topic;
new_query->query = query;
new_query->msg_id = msg_id;
}
if (data)
new_query->data = strdupz(data);
new_query->next = NULL;
new_query->created = now_realtime_sec();
new_query->run_after = run_after;
debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
tmp_query = aclk_query_find_position(run_after);
if (tmp_query) {
new_query->next = tmp_query->next;
tmp_query->next = new_query;
if (tmp_query == aclk_queue.aclk_query_tail)
aclk_queue.aclk_query_tail = new_query;
aclk_queue.count++;
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 0;
}
new_query->next = aclk_queue.aclk_query_head;
aclk_queue.aclk_query_head = new_query;
aclk_queue.count++;
QUERY_UNLOCK;
QUERY_THREAD_WAKEUP;
return 0;
}
inline int aclk_submit_request(struct aclk_request *request)
{
return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
}
/*
* Get the next query to process - NULL if nothing there
* The caller needs to free memory by calling aclk_query_free()
*
* topic
* query
* The structure itself
*
*/
struct aclk_query *aclk_queue_pop()
{
struct aclk_query *this_query;
QUERY_LOCK;
if (likely(!aclk_queue.aclk_query_head)) {
QUERY_UNLOCK;
return NULL;
}
this_query = aclk_queue.aclk_query_head;
// Get rid of the deleted entries
while (this_query && this_query->deleted) {
aclk_queue.count--;
aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
if (likely(!aclk_queue.aclk_query_head)) {
aclk_queue.aclk_query_tail = NULL;
}
aclk_query_free(this_query);
this_query = aclk_queue.aclk_query_head;
}
if (likely(!this_query)) {
QUERY_UNLOCK;
return NULL;
}
if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
QUERY_UNLOCK;
return NULL;
}
aclk_queue.count--;
aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
if (likely(!aclk_queue.aclk_query_head)) {
aclk_queue.aclk_query_tail = NULL;
}
QUERY_UNLOCK;
return this_query;
}
// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g. base_topic/subtopic
// This is called by aclk_init(), to compute the base topic once and have
// it stored internally.
// Need to check if additional logic should be added to make sure that there
// is enough information to determine the base topic at init time
char *create_publish_base_topic()
{
if (unlikely(!is_agent_claimed()))
return NULL;
ACLK_LOCK;
if (unlikely(!global_base_topic)) {
char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
tmp = strchr(tmp_topic, '\n');
if (unlikely(tmp))
*tmp = '\0';
global_base_topic = strdupz(tmp_topic);
}
ACLK_UNLOCK;
return global_base_topic;
}
/*
* Build a topic based on sub_topic and final_topic
* if the sub topic starts with / assume that is an absolute topic
*
*/
char *get_topic(char *sub_topic, char *final_topic, int max_size)
{
int rc;
if (likely(sub_topic && sub_topic[0] == '/'))
return sub_topic;
if (unlikely(!global_base_topic))
return sub_topic;
rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
if (unlikely(rc >= max_size))
debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
return final_topic;
}
/*
* Free a collector structure
*/
static void _free_collector(struct _collector *collector)
{
if (likely(collector->plugin_name))
freez(collector->plugin_name);
if (likely(collector->module_name))
freez(collector->module_name);
if (likely(collector->hostname))
freez(collector->hostname);
freez(collector);
}
/*
* This will report the collector list
*
*/
#ifdef ACLK_DEBUG
static void _dump_collector_list()
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
info("DUMPING ALL COLLECTORS");
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
info("DUMPING ALL COLLECTORS -- nothing found");
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
while (tmp_collector) {
info(
"COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
tmp_collector = tmp_collector->next;
}
info("DUMPING ALL COLLECTORS DONE");
COLLECTOR_UNLOCK;
}
#endif
/*
* This will cleanup the collector list
*
*/
static void _reset_collector_list()
{
struct _collector *tmp_collector, *next_collector;
COLLECTOR_LOCK;
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
collector_list->count = 0;
collector_list->next = NULL;
// We broke the link; we can unlock
COLLECTOR_UNLOCK;
while (tmp_collector) {
next_collector = tmp_collector->next;
_free_collector(tmp_collector);
tmp_collector = next_collector;
}
}
/*
* Find a collector (if it exists)
* Must lock before calling this
* If last_collector is not null, it will return the previous collector in the linked
* list (used in collector delete)
*/
static struct _collector *_find_collector(
const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
struct _collector *tmp_collector, *prev_collector;
uint32_t plugin_hash;
uint32_t module_hash;
uint32_t hostname_hash;
if (unlikely(!collector_list)) {
collector_list = callocz(1, sizeof(struct _collector));
return NULL;
}
if (unlikely(!collector_list->next))
return NULL;
plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
module_hash = module_name ? simple_hash(module_name) : 1;
hostname_hash = simple_hash(hostname);
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
prev_collector = collector_list;
while (tmp_collector) {
if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
(!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
(!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
if (unlikely(last_collector))
*last_collector = prev_collector;
return tmp_collector;
}
prev_collector = tmp_collector;
tmp_collector = tmp_collector->next;
}
return tmp_collector;
}
/*
* Called to delete a collector
* It will reduce the count (chart_count) and will remove it
* from the linked list if the count reaches zero
* The structure will be returned to the caller to free
* the resources
*
*/
static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector, *prev_collector = NULL;
tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
if (likely(tmp_collector)) {
--tmp_collector->count;
if (unlikely(!tmp_collector->count))
prev_collector->next = tmp_collector->next;
}
return tmp_collector;
}
/*
* Add a new collector (plugin / module) to the list
* If it already exists just update the chart count
*
* Lock before calling
*/
static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
if (unlikely(!tmp_collector)) {
tmp_collector = callocz(1, sizeof(struct _collector));
tmp_collector->hostname_hash = simple_hash(hostname);
tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
tmp_collector->hostname = strdupz(hostname);
tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
tmp_collector->next = collector_list->next;
collector_list->next = tmp_collector;
}
tmp_collector->count++;
debug(
D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
module_name ? module_name : "*", tmp_collector->count);
return tmp_collector;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
tmp_collector = _add_collector(hostname, plugin_name, module_name);
if (unlikely(tmp_collector->count != 1)) {
COLLECTOR_UNLOCK;
return;
}
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
else {
if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
COLLECTOR_UNLOCK;
}
/*
* Delete a collector from the list
* If the chart count reaches zero the collector will be removed
* from the list by calling del_collector.
*
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
tmp_collector = _del_collector(hostname, plugin_name, module_name);
if (unlikely(!tmp_collector || tmp_collector->count)) {
COLLECTOR_UNLOCK;
return;
}
debug(
D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
tmp_collector->count);
COLLECTOR_UNLOCK;
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
else {
if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
_free_collector(tmp_collector);
}
int aclk_execute_query(struct aclk_query *this_query)
{
if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
char *mysep = strchr(this_query->query, '?');
if (mysep) {
strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
*mysep = '\0';
} else
strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
mysep = strrchr(this_query->query, '/');
// TODO: handle bad response perhaps in a different way. For now it does to the payload
int rc = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "http", this_query->msg_id);
char *encoded_response = aclk_encode_response(w->response.data);
buffer_sprintf(
local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\"\n}", rc, encoded_response);
buffer_sprintf(local_buffer, "\n}");
aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
buffer_free(w->response.data);
freez(w);
buffer_free(local_buffer);
freez(encoded_response);
return 0;
}
return 1;
}
/*
* This function will fetch the next pending command and process it
*
*/
int aclk_process_query()
{
struct aclk_query *this_query;
static long int query_count = 0;
if (!aclk_connected)
return 0;
this_query = aclk_queue_pop();
if (likely(!this_query)) {
return 0;
}
if (unlikely(this_query->deleted)) {
debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
aclk_query_free(this_query);
return 1;
}
query_count++;
debug(
D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
switch (this_query->cmd) {
case ACLK_CMD_ONCONNECT:
debug(D_ACLK, "EXECUTING on connect metadata command");
aclk_send_metadata();
aclk_metadata_submitted = ACLK_METADATA_SENT;
break;
case ACLK_CMD_CHART:
debug(D_ACLK, "EXECUTING a chart update command");
aclk_send_single_chart(this_query->data, this_query->query);
break;
case ACLK_CMD_CHARTDEL:
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
aclk_send_info_metadata();
break;
case ACLK_CMD_ALARM:
debug(D_ACLK, "EXECUTING an alarm update command");
aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
break;
case ACLK_CMD_ALARMS:
debug(D_ACLK, "EXECUTING an alarms update command");
aclk_send_alarm_metadata();
break;
case ACLK_CMD_CLOUD:
debug(D_ACLK, "EXECUTING a cloud command");
aclk_execute_query(this_query);
break;
default:
break;
}
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
aclk_query_free(this_query);
return 1;
}
/*
* Process all pending queries
* Return 0 if no queries were processed, 1 otherwise
*
*/
int aclk_process_queries()
{
if (unlikely(netdata_exit || !aclk_connected))
return 0;
if (likely(!aclk_queue.count))
return 0;
debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
//TODO: may consider possible throttling here
while (aclk_process_query()) {
// Process all commands
};
return 1;
}
static void aclk_query_thread_cleanup(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
COLLECTOR_LOCK;
_reset_collector_list();
freez(collector_list);
COLLECTOR_UNLOCK;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
/**
* Main query processing thread
*
* On startup wait for the agent collectors to initialize
* Expect at least a time of ACLK_STABLE_TIMEOUT seconds
* of no new collectors coming in in order to mark the agent
* as stable (set agent_state = AGENT_STABLE)
*/
void *aclk_query_main_thread(void *ptr)
{
netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
time_t checkpoint;
checkpoint = now_realtime_sec() - last_init_sequence;
if (checkpoint > ACLK_STABLE_TIMEOUT) {
agent_state = AGENT_STABLE;
info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
#ifdef ACLK_DEBUG
_dump_collector_list();
#endif
break;
}
info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
sleep_usec(USEC_PER_SEC * 1);
}
while (!netdata_exit) {
if (unlikely(!aclk_metadata_submitted)) {
aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
errno = 0;
error("ACLK failed to queue on_connect command");
aclk_metadata_submitted = 0;
}
}
aclk_process_queries();
QUERY_THREAD_LOCK;
// TODO: Need to check if there are queries awaiting already
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
} // forever
info("Shutting down query processing thread");
netdata_thread_cleanup_pop(1);
return NULL;
}
// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
char payload[512];
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
info("cleaning up...");
if (is_agent_claimed() && aclk_connected) {
size_t write_q, write_q_bytes, read_q;
time_t event_loop_timeout;
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
// Send a graceful disconnect message
char *msg_id = create_uuid();
usec_t time_created_offset_usec = now_realtime_usec();
time_t time_created = time_created_offset_usec / USEC_PER_SEC;
time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
snprintfz(
payload, 511,
"{ \"type\": \"disconnect\","
" \"msg-id\": \"%s\","
" \"timestamp\": %ld,"
" \"timestamp-offset-usec\": %llu,"
" \"version\": %d,"
" \"payload\": \"graceful\" }",
msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
freez(msg_id);
event_loop_timeout = now_realtime_sec() + 5;
write_q = 1;
while (write_q && event_loop_timeout > now_realtime_sec()) {
_link_event_loop();
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
}
aclk_shutting_down = 1;
_link_shutdown();
aclk_lws_wss_mqtt_layer_disconect_notif();
write_q = 1;
event_loop_timeout = now_realtime_sec() + 5;
while (write_q && event_loop_timeout > now_realtime_sec()) {
_link_event_loop();
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
}
}
info("Disconnected");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
struct dictionary_singleton {
char *key;
char *result;
};
int json_extract_singleton(JSON_ENTRY *e)
{
struct dictionary_singleton *data = e->callback_data;
switch (e->type) {
case JSON_OBJECT:
case JSON_ARRAY:
break;
case JSON_STRING:
if (!strcmp(e->name, data->key)) {
data->result = strdupz(e->data.string);
break;
}
break;
case JSON_NUMBER:
case JSON_BOOLEAN:
case JSON_NULL:
break;
}
return 0;
}
// Base-64 decoder.
// Note: This is non-validating, invalid input will be decoded without an error.
// Challenges are packed into json strings so we don't skip newlines.
// Size errors (i.e. invalid input size or insufficient output space) are caught.
size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
{
static char lookup[256];
static int first_time=1;
if (first_time)
{
first_time = 0;
for(int i=0; i<256; i++)
lookup[i] = -1;
for(int i='A'; i<='Z'; i++)
lookup[i] = i-'A';
for(int i='a'; i<='z'; i++)
lookup[i] = i-'a' + 26;
for(int i='0'; i<='9'; i++)
lookup[i] = i-'0' + 52;
lookup['+'] = 62;
lookup['/'] = 63;
}
if ((input_size & 3) != 0)
{
error("Can't decode base-64 input length %zu", input_size);
return 0;
}
size_t unpadded_size = (input_size/4) * 3;
if ( unpadded_size > output_size )
{
error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
return 0;
}
// Don't check padding within full quantums
for (size_t i = 0 ; i < input_size-4 ; i+=4 )
{
uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
output[0] = value >> 16;
output[1] = value >> 8;
output[2] = value;
//error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
output += 3;
input += 4;
}
// Handle padding only in last quantum
if (input[2] == '=') {
uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
output[0] = value >> 4;
//error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
return unpadded_size-2;
}
else if (input[3] == '=') {
uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
output[0] = value >> 10;
output[1] = value >> 2;
//error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
return unpadded_size-1;
}
else
{
uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
output[0] = value >> 16;
output[1] = value >> 8;
output[2] = value;
//error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
return unpadded_size;
}
}
size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
{
uint32_t value;
static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
if ((input_size/3+1)*4 >= output_size)
{
error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
return 0;
}
size_t count = 0;
while (input_size>3)
{
value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
output[0] = lookup[value >> 18];
output[1] = lookup[(value >> 12) & 0x3f];
output[2] = lookup[(value >> 6) & 0x3f];
output[3] = lookup[value & 0x3f];
//error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
output += 4;
input += 3;
input_size -= 3;
count += 4;
}
switch (input_size)
{
case 2:
value = (input[0] << 10) + (input[1] << 2);
output[0] = lookup[(value >> 12) & 0x3f];
output[1] = lookup[(value >> 6) & 0x3f];
output[2] = lookup[value & 0x3f];
output[3] = '=';
//error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
count += 4;
break;
case 1:
value = input[0] << 4;
output[0] = lookup[(value >> 6) & 0x3f];
output[1] = lookup[value & 0x3f];
output[2] = '=';
output[3] = '=';
//error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
count += 4;
break;
case 0:
break;
}
return count;
}
int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
{
int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
if (result == -1) {
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
error("Decryption of the challenge failed: %s", err);
}
return result;
}
char *extract_payload(BUFFER *b)
{
char *s = b->buffer;
unsigned int line_len=0;
for (size_t i=0; i<b->len; i++)
{
if (*s == 0 )
return NULL;
if (*s == '\n' ) {
if (line_len==0)
return s+1;
line_len = 0;
}
else if (*s == '\r') {
/* don't count */
}
else
line_len ++;
s++;
}
return NULL;
}
void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
{
char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
debug(D_ACLK, "Performing challenge-response sequence");
if (aclk_password != NULL)
{
freez(aclk_password);
aclk_password = NULL;
}
// curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
// TODO - target host?
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
{
error("Agent was not claimed - cannot perform challenge/response");
goto CLEANUP;
}
char url[1024];
sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
{
error("Challenge failed: %s", data_buffer);
goto CLEANUP;
}
struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
{
freez(challenge.result);
error("Could not parse the json response with the challenge: %s", data_buffer);
goto CLEANUP;
}
if (challenge.result == NULL ) {
error("Could not retrieve challenge from auth response: %s", data_buffer);
goto CLEANUP;
}
size_t challenge_len = strlen(challenge.result);
unsigned char decoded[512];
size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
unsigned char plaintext[4096]={};
int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
freez(challenge.result);
char encoded[512];
size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
encoded[encoded_len] = 0;
debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
char response_json[4096]={};
sprintf(response_json, "{\"response\":\"%s\"}", encoded);
debug(D_ACLK, "Password phase: %s",response_json);
// TODO - host
sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
{
error("Challenge-response failed: %s", data_buffer);
goto CLEANUP;
}
debug(D_ACLK, "Password response from cloud: %s", data_buffer);
struct dictionary_singleton password = { .key = "password", .result = NULL };
if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
{
freez(password.result);
error("Could not parse the json response with the password: %s", data_buffer);
goto CLEANUP;
}
if (password.result == NULL ) {
error("Could not retrieve password from auth response");
goto CLEANUP;
}
if (aclk_password != NULL )
freez(aclk_password);
if (aclk_username == NULL)
aclk_username = strdupz(agent_id);
aclk_password = password.result;
CLEANUP:
freez(data_buffer);
return;
}
static void aclk_try_to_connect(char *hostname, char *port, int port_num)
{
info("Attempting to establish the agent cloud link");
aclk_get_challenge(hostname, port);
if (aclk_password == NULL)
return;
int rc;
rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
if (unlikely(rc)) {
error("Failed to initialize the agent cloud link library");
}
aclk_connecting = 1;
}
/**
* Main agent cloud link thread
*
* This thread will simply call the main event loop that handles
* pending requests - both inbound and outbound
*
* @param ptr is a pointer to the netdata_static_thread structure.
*
* @return It always returns NULL
*/
void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
info("Waiting for netdata to be ready");
while (!netdata_ready) {
sleep_usec(USEC_PER_MS * 300);
}
last_init_sequence = now_realtime_sec();
query_thread = NULL;
char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
char *aclk_port = NULL;
uint32_t port_num = 0;
char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", DEFAULT_CLOUD_BASE_URL);
if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
error("Configuration error - cannot use agent cloud link");
return NULL;
}
port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
info("Waiting for netdata to be claimed");
while(1) {
while (likely(!is_agent_claimed())) {
sleep_usec(USEC_PER_SEC * 5);
if (netdata_exit)
goto exited;
}
if (!create_private_key() && !_mqtt_lib_init())
break;
if (netdata_exit)
goto exited;
sleep_usec(USEC_PER_SEC * 60);
}
create_publish_base_topic();
create_private_key();
usec_t reconnect_expiry = 0; // In usecs
while (!netdata_exit) {
static int first_init = 0;
size_t write_q, write_q_bytes, read_q;
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
//info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
// first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
if (unlikely(!netdata_exit && !aclk_connected)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
first_init = 1;
} else {
if (aclk_connecting == 0) {
if (reconnect_expiry == 0) {
unsigned long int delay = aclk_reconnect_delay(1);
reconnect_expiry = now_realtime_usec() + delay * 1000;
info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
}
if (now_realtime_usec() >= reconnect_expiry) {
reconnect_expiry = 0;
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
}
sleep_usec(USEC_PER_MS * 100);
}
}
if (aclk_connecting) {
_link_event_loop();
sleep_usec(USEC_PER_MS * 100);
}
continue;
}
_link_event_loop();
/*static int stress_counter = 0;
if (write_q_bytes==0 && stress_counter ++ >5)
{
aclk_send_stress_test(8000000);
stress_counter = 0;
}*/
// TODO: Move to on-connect
if (unlikely(!aclk_subscribed)) {
aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
}
if (unlikely(!query_thread)) {
query_thread = callocz(1, sizeof(struct netdata_static_thread));
query_thread->thread = mallocz(sizeof(netdata_thread_t));
netdata_thread_create(
query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
query_thread);
}
} // forever
exited:
aclk_shutdown();
freez(aclk_username);
freez(aclk_password);
freez(aclk_hostname);
freez(aclk_port);
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);
netdata_thread_cleanup_pop(1);
return NULL;
}
/*
* Send a message to the cloud, using a base topic and sib_topic
* The final topic will be in the form <base_topic>/<sub_topic>
* If base_topic is missing then the global_base_topic will be used (if available)
*
*/
int aclk_send_message(char *sub_topic, char *message, char *msg_id)
{
int rc;
int mid;
char topic[ACLK_MAX_TOPIC + 1];
char *final_topic;
UNUSED(msg_id);
if (!aclk_connected)
return 0;
if (unlikely(!message))
return 0;
final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
if (unlikely(!final_topic)) {
errno = 0;
error("Unable to build outgoing topic; truncated?");
return 1;
}
ACLK_LOCK;
rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
// TODO: link the msg_id with the mid so we can trace it
ACLK_UNLOCK;
if (unlikely(rc)) {
errno = 0;
error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
}
return rc;
}
/*
* Subscribe to a topic in the cloud
* The final subscription will be in the form
* /agent/claim_id/<sub_topic>
*/
int aclk_subscribe(char *sub_topic, int qos)
{
int rc;
char topic[ACLK_MAX_TOPIC + 1];
char *final_topic;
final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
if (unlikely(!final_topic)) {
errno = 0;
error("Unable to build outgoing topic; truncated?");
return 1;
}
if (!aclk_connected) {
error("Cannot subscribe to %s - not connected!", topic);
return 1;
}
ACLK_LOCK;
rc = _link_subscribe(final_topic, qos);
ACLK_UNLOCK;
// TODO: Add better handling -- error will flood the logfile here
if (unlikely(rc)) {
errno = 0;
error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
}
return rc;
}
// This is called from a callback when the link goes up
void aclk_connect()
{
info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
aclk_connected = 1;
waiting_init = 0;
aclk_reconnect_delay(0);
QUERY_THREAD_WAKEUP;
return;
}
// This is called from a callback when the link goes down
void aclk_disconnect()
{
if (likely(aclk_connected))
info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
aclk_subscribed = 0;
aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
waiting_init = 1;
aclk_connected = 0;
aclk_connecting = 0;
}
void aclk_shutdown()
{
info("Shutdown initiated");
aclk_connected = 0;
_link_shutdown();
info("Shutdown complete");
}
inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
{
uuid_t uuid;
char uuid_str[36 + 1];
if (unlikely(!msg_id)) {
uuid_generate(uuid);
uuid_unparse(uuid, uuid_str);
msg_id = uuid_str;
}
usec_t time_created_offset_usec = now_realtime_usec();
time_t time_created = time_created_offset_usec / USEC_PER_SEC;
time_created_offset_usec = time_created_offset_usec % USEC_PER_SEC;
buffer_sprintf(
dest,
"\t{\"type\": \"%s\",\n"
"\t\"msg-id\": \"%s\",\n"
"\t\"timestamp\": %ld,\n"
"\t\"timestamp-offset-usec\": %llu,\n"
"\t\"version\": %d,\n"
"\t\"payload\": ",
type, msg_id, time_created, time_created_offset_usec, ACLK_VERSION);
debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
}
/*
* Take a buffer, encode it and rewrite it
*
*/
char *aclk_encode_response(BUFFER *contents)
{
char *tmp_buffer = mallocz(contents->len * 2);
char *src, *dst;
size_t content_size = contents->len;
src = contents->buffer;
dst = tmp_buffer;
while (content_size > 0) {
switch (*src) {
case '\n':
case '\t':
break;
case 0x01 ... 0x08:
case 0x0b ... 0x1F:
*dst++ = '\\';
*dst++ = '0';
*dst++ = '0';
*dst++ = (*src < 0x0F) ? '0' : '1';
*dst++ = to_hex(*src);
break;
case '\"':
*dst++ = '\\';
*dst++ = *src;
break;
default:
*dst++ = *src;
}
src++;
content_size--;
}
*dst = '\0';
return tmp_buffer;
}
/*
* This will send alarm information which includes
* configured alarms
* alarm_log
* active alarms
*/
void aclk_send_alarm_metadata()
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
debug(D_ACLK, "Metadata alarms start");
aclk_create_header(local_buffer, "connect_alarms", msg_id);
buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
health_alarms2json(localhost, local_buffer, 1);
debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
health_alarm_log2json(localhost, local_buffer, 0);
debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
health_alarms_values2json(localhost, local_buffer, 0);
debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, "\n}\n}");
aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
}
/*
* This will send the agent metadata
* /api/v1/info
* charts
*/
int aclk_send_info_metadata()
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
debug(D_ACLK, "Metadata /info start");
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "connect", msg_id);
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
charts2json(localhost, local_buffer, 1);
buffer_sprintf(local_buffer, "\n}\n}");
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
return 0;
}
void aclk_send_stress_test(size_t size)
{
char *buffer = mallocz(size);
if (buffer != NULL)
{
for(size_t i=0; i<size; i++)
buffer[i] = 'x';
buffer[size-1] = 0;
time_t time_created = now_realtime_sec();
sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
buffer[strlen(buffer)] = '"';
buffer[size-2] = '}';
buffer[size-3] = '"';
aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
error("Sending stress of size %zu at time %ld", size, time_created);
}
free(buffer);
}
// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{
aclk_send_info_metadata();
aclk_send_alarm_metadata();
return 0;
}
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
// Trigged by a health reload, sends the alarm metadata
void aclk_alarm_reload()
{
if (unlikely(agent_state == AGENT_INITIALIZING))
return;
if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
}
}
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
int aclk_send_single_chart(char *hostname, char *chart)
{
RRDHOST *target_host;
target_host = rrdhost_find_by_hostname(hostname, 0);
if (!target_host)
return 1;
RRDSET *st = rrdset_find(target_host, chart);
if (!st)
st = rrdset_find_byname(target_host, chart);
if (!st) {
info("FAILED to find chart %s", chart);
return 1;
}
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "chart", msg_id);
rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");
aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
return 0;
}
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
{
#ifndef ENABLE_ACLK
UNUSED(host);
UNUSED(chart_name);
return 0;
#else
if (host != localhost)
return 0;
if (unlikely(aclk_disable_single_updates))
return 0;
if (unlikely(agent_state == AGENT_INITIALIZING))
last_init_sequence = now_realtime_sec();
else {
if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue chart_update command");
}
}
}
return 0;
#endif
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer = NULL;
if (host != localhost)
return 0;
if (unlikely(agent_state == AGENT_INITIALIZING))
return 0;
/*
* Check if individual updates have been disabled
* This will be the case when we do health reload
* and all the alarms will be dropped and recreated.
* At the end of the health reload the complete alarm metadata
* info will be sent
*/
if (unlikely(aclk_disable_single_updates))
return 0;
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
char *msg_id = create_uuid();
buffer_flush(local_buffer);
aclk_create_header(local_buffer, "status-change", msg_id);
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
buffer_sprintf(local_buffer, "\n}");
if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue alarm_command on alarm_update");
}
}
freez(msg_id);
buffer_free(local_buffer);
return 0;
}
/*
* Parse the incoming payload and queue a command if valid
*/
int aclk_handle_cloud_request(char *payload)
{
struct aclk_request cloud_to_agent = {
.type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
};
if (unlikely(agent_state == AGENT_INITIALIZING)) {
debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
return 0;
}
if (unlikely(!payload)) {
debug(D_ACLK, "ACLK incoming message is empty");
return 0;
}
debug(D_ACLK, "ACLK incoming message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
if (unlikely(
JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
!cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
strcmp(cloud_to_agent.type_id, "http"))) {
if (JSON_OK != rc)
error("Malformed json request (%s)", payload);
if (cloud_to_agent.version > ACLK_VERSION)
error("Unsupported version in JSON request %d", cloud_to_agent.version);
if (cloud_to_agent.payload)
freez(cloud_to_agent.payload);
if (cloud_to_agent.type_id)
freez(cloud_to_agent.type_id);
if (cloud_to_agent.msg_id)
freez(cloud_to_agent.msg_id);
if (cloud_to_agent.callback_topic)
freez(cloud_to_agent.callback_topic);
return 1;
}
if (unlikely(aclk_submit_request(&cloud_to_agent)))
debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
// Note: the payload comes from the callback and it will be automatically freed
return 0;
}