mirror of
https://github.com/netdata/netdata.git
synced 2025-04-17 03:02:41 +00:00

* cleanup alerts * fix references * fix references * fix references * load alerts once and apply them to each node * simplify health_create_alarm_entry() * Compile without warnings with compiler flags: -Wall -Wextra -Wformat=2 -Wshadow -Wno-format-nonliteral -Winit-self * code re-organization and cleanup * generate patterns when applying prototypes; give unique dyncfg names to all alerts * eval expressions keep the source and the parsed_as as STRING pointers * renamed host to node in dyncfg ids * renamed host to node in dyncfg ids * add all cloud roles to the list of parsed X-Netdata-Role header and also default to member access level * working functionality * code re-organization: moved health event-loop to a new file, moved health globals to health.c * rrdcalctemplate is removed; alert_cfg is removed; foreach dimension is removed; RRDCALCs are now instanciated only when they are linked to RRDSETs * dyncfg alert prototypes initialization for alerts * health dyncfg split to separate file * cleanup not-needed code * normalize matches between parsing and json * also detect !* for disabled alerts * dyncfg capability disabled * Store alert config part1 * Add rrdlabels_common_count * wip health variables lookup without indexes * Improve rrdlabels_common_count by reusing rrdlabels_find_label_with_key_unsafe with an additional parameter * working variables with runtime lookup * working variables with runtime lookup * delete rrddimvar and rrdfamily index * remove rrdsetvar; now all variables are in RRDVARs inside hosts and charts * added /api/v1/variable that resolves a variable the same way alerts do * remove rrdcalc from eval * remove debug code * remove duplicate assignment * Fix memory leak * all alert variables are now handled by alert_variable_lookup() and EVAL is now independent of alerts * hide all internal structures of EVAL * Enable -Wformat flag Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud> * Adjust binding for calculation, warning, critical * Remove unused macro * Update config hash id * use the right info and summary in alerts log * use synchronous queries for alerts * Handle cases when config_hash_id is missing from health_log * remove deadlock from health worker * parsing to json payload for health alert prototypes * cleaner parsing and avoiding memory leaks in case of duplicate members in json * fix left-over rename of function * Keep original lookup field to send to the cloud Cleanup / rename function to store config Remove unused DEFINEs, functions * Use ac->lookup * link jobs to the host when the template is registered; do not accept running a function without a host * full dyncfg support for health alerts, except action TEST * working dyncfg additions, updates, removals * fixed missing source, wrong status updates * add alerts by type, component, classification, recipient and module at the /api/v2/alerts endpoint * fix dyncfg unittest * rename functions * generalize the json-c parser macros and move them to libnetdata * report progress when enabling and disabling dyncfg templates * moved rrdcalc and rrdvar to health * update alarms * added schema for alerts; separated alert_action_options from rrdr_options; restructured the json payload for alerts * enable parsed json alerts; allow sending back accepted but disabled * added format_version for alerts payload; enables/disables status now is also inheritted by the status of the rules; fixed variable names in json output * remove the RRDHOST pointer from DYNCFG * Fix command field submitted to the cloud * do not send updates to creation requests, for DYNCFG jobs --------- Signed-off-by: Tasos Katsoulas <tasos@netdata.cloud> Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Tasos Katsoulas <tasos@netdata.cloud> Co-authored-by: ilyam8 <ilya@netdata.cloud>
1126 lines
35 KiB
C
1126 lines
35 KiB
C
// SPDX-License-Identifier: GPL-3.0-only
|
|
// Copyright (C) 2020 Timotej Šiškovič
|
|
|
|
#ifndef _GNU_SOURCE
|
|
#define _GNU_SOURCE
|
|
#endif
|
|
|
|
#include "mqtt_wss_client.h"
|
|
#include "mqtt_ng.h"
|
|
#include "ws_client.h"
|
|
#include "common_internal.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <poll.h>
|
|
#include <string.h>
|
|
#include <time.h>
|
|
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <netinet/tcp.h> //TCP_NODELAY
|
|
#include <netdb.h>
|
|
|
|
#include <openssl/err.h>
|
|
#include <openssl/ssl.h>
|
|
|
|
#define PIPE_READ_END 0
|
|
#define PIPE_WRITE_END 1
|
|
#define POLLFD_SOCKET 0
|
|
#define POLLFD_PIPE 1
|
|
|
|
#if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
|
|
#include <openssl/conf.h>
|
|
#endif
|
|
|
|
//TODO MQTT_PUBLISH_RETAIN should not be needed anymore
|
|
#define MQTT_PUBLISH_RETAIN 0x01
|
|
#define MQTT_CONNECT_CLEAN_SESSION 0x02
|
|
#define MQTT_CONNECT_WILL_RETAIN 0x20
|
|
|
|
char *util_openssl_ret_err(int err)
|
|
{
|
|
switch(err){
|
|
case SSL_ERROR_WANT_READ:
|
|
return "SSL_ERROR_WANT_READ";
|
|
case SSL_ERROR_WANT_WRITE:
|
|
return "SSL_ERROR_WANT_WRITE";
|
|
case SSL_ERROR_NONE:
|
|
return "SSL_ERROR_NONE";
|
|
case SSL_ERROR_ZERO_RETURN:
|
|
return "SSL_ERROR_ZERO_RETURN";
|
|
case SSL_ERROR_WANT_CONNECT:
|
|
return "SSL_ERROR_WANT_CONNECT";
|
|
case SSL_ERROR_WANT_ACCEPT:
|
|
return "SSL_ERROR_WANT_ACCEPT";
|
|
case SSL_ERROR_WANT_X509_LOOKUP:
|
|
return "SSL_ERROR_WANT_X509_LOOKUP";
|
|
#ifdef SSL_ERROR_WANT_ASYNC
|
|
case SSL_ERROR_WANT_ASYNC:
|
|
return "SSL_ERROR_WANT_ASYNC";
|
|
#endif
|
|
#ifdef SSL_ERROR_WANT_ASYNC_JOB
|
|
case SSL_ERROR_WANT_ASYNC_JOB:
|
|
return "SSL_ERROR_WANT_ASYNC_JOB";
|
|
#endif
|
|
#ifdef SSL_ERROR_WANT_CLIENT_HELLO_CB
|
|
case SSL_ERROR_WANT_CLIENT_HELLO_CB:
|
|
return "SSL_ERROR_WANT_CLIENT_HELLO_CB";
|
|
#endif
|
|
case SSL_ERROR_SYSCALL:
|
|
return "SSL_ERROR_SYSCALL";
|
|
case SSL_ERROR_SSL:
|
|
return "SSL_ERROR_SSL";
|
|
}
|
|
return "UNKNOWN";
|
|
}
|
|
|
|
struct mqtt_wss_client_struct {
|
|
ws_client *ws_client;
|
|
|
|
mqtt_wss_log_ctx_t log;
|
|
|
|
// immediate connection (e.g. proxy server)
|
|
char *host;
|
|
int port;
|
|
|
|
// target of connection (e.g. where we want to connect to)
|
|
char *target_host;
|
|
int target_port;
|
|
|
|
enum mqtt_wss_proxy_type proxy_type;
|
|
char *proxy_uname;
|
|
char *proxy_passwd;
|
|
|
|
// nonblock IO related
|
|
int sockfd;
|
|
int write_notif_pipe[2];
|
|
struct pollfd poll_fds[2];
|
|
|
|
SSL_CTX *ssl_ctx;
|
|
SSL *ssl;
|
|
int ssl_flags;
|
|
|
|
struct mqtt_ng_client *mqtt;
|
|
|
|
int mqtt_keepalive;
|
|
|
|
pthread_mutex_t pub_lock;
|
|
|
|
// signifies that we didn't write all MQTT wanted
|
|
// us to write during last cycle (e.g. due to buffer
|
|
// size) and thus we should arm POLLOUT
|
|
unsigned int mqtt_didnt_finish_write:1;
|
|
|
|
unsigned int mqtt_connected:1;
|
|
unsigned int mqtt_disconnecting:1;
|
|
|
|
// Application layer callback pointers
|
|
void (*msg_callback)(const char *, const void *, size_t, int);
|
|
void (*puback_callback)(uint16_t packet_id);
|
|
|
|
pthread_mutex_t stat_lock;
|
|
struct mqtt_wss_stats stats;
|
|
|
|
#ifdef MQTT_WSS_DEBUG
|
|
void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line);
|
|
#endif
|
|
};
|
|
|
|
static void mws_connack_callback_ng(void *user_ctx, int code)
|
|
{
|
|
mqtt_wss_client client = user_ctx;
|
|
switch(code) {
|
|
case 0:
|
|
client->mqtt_connected = 1;
|
|
return;
|
|
//TODO manual labor: all the CONNACK error codes with some nice error message
|
|
default:
|
|
mws_error(client->log, "MQTT CONNACK returned error %d", code);
|
|
return;
|
|
}
|
|
}
|
|
|
|
static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len)
|
|
{
|
|
mqtt_wss_client client = user_ctx;
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "mqtt_pal_sendall(len=%d)", len);
|
|
#endif
|
|
int ret = ws_client_send(client->ws_client, WS_OP_BINARY_FRAME, buf, len);
|
|
if (ret >= 0 && (size_t)ret != len) {
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret);
|
|
#endif
|
|
client->mqtt_didnt_finish_write = 1;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
mqtt_wss_client mqtt_wss_new(const char *log_prefix,
|
|
mqtt_wss_log_callback_t log_callback,
|
|
msg_callback_fnc_t msg_callback,
|
|
void (*puback_callback)(uint16_t packet_id))
|
|
{
|
|
mqtt_wss_log_ctx_t log;
|
|
|
|
log = mqtt_wss_log_ctx_create(log_prefix, log_callback);
|
|
if(!log)
|
|
return NULL;
|
|
|
|
SSL_library_init();
|
|
SSL_load_error_strings();
|
|
|
|
mqtt_wss_client client = mw_calloc(1, sizeof(struct mqtt_wss_client_struct));
|
|
if (!client) {
|
|
mws_error(log, "OOM alocating mqtt_wss_client");
|
|
goto fail;
|
|
}
|
|
|
|
pthread_mutex_init(&client->pub_lock, NULL);
|
|
pthread_mutex_init(&client->stat_lock, NULL);
|
|
|
|
client->msg_callback = msg_callback;
|
|
client->puback_callback = puback_callback;
|
|
|
|
client->ws_client = ws_client_new(0, &client->target_host, log);
|
|
if (!client->ws_client) {
|
|
mws_error(log, "Error creating ws_client");
|
|
goto fail_1;
|
|
}
|
|
|
|
client->log = log;
|
|
|
|
#ifdef __APPLE__
|
|
if (pipe(client->write_notif_pipe)) {
|
|
#else
|
|
if (pipe2(client->write_notif_pipe, O_CLOEXEC /*| O_DIRECT*/)) {
|
|
#endif
|
|
mws_error(log, "Couldn't create pipe");
|
|
goto fail_2;
|
|
}
|
|
|
|
client->poll_fds[POLLFD_PIPE].fd = client->write_notif_pipe[PIPE_READ_END];
|
|
client->poll_fds[POLLFD_PIPE].events = POLLIN;
|
|
|
|
client->poll_fds[POLLFD_SOCKET].events = POLLIN;
|
|
|
|
struct mqtt_ng_init settings = {
|
|
.log = log,
|
|
.data_in = client->ws_client->buf_to_mqtt,
|
|
.data_out_fnc = &mqtt_send_cb,
|
|
.user_ctx = client,
|
|
.connack_callback = &mws_connack_callback_ng,
|
|
.puback_callback = puback_callback,
|
|
.msg_callback = msg_callback
|
|
};
|
|
if ( (client->mqtt = mqtt_ng_init(&settings)) == NULL ) {
|
|
mws_error(log, "Error initializing internal MQTT client");
|
|
goto fail_3;
|
|
}
|
|
|
|
return client;
|
|
|
|
fail_3:
|
|
close(client->write_notif_pipe[PIPE_WRITE_END]);
|
|
close(client->write_notif_pipe[PIPE_READ_END]);
|
|
fail_2:
|
|
ws_client_destroy(client->ws_client);
|
|
fail_1:
|
|
mw_free(client);
|
|
fail:
|
|
mqtt_wss_log_ctx_destroy(log);
|
|
return NULL;
|
|
}
|
|
|
|
void mqtt_wss_set_max_buf_size(mqtt_wss_client client, size_t size)
|
|
{
|
|
mqtt_ng_set_max_mem(client->mqtt, size);
|
|
}
|
|
|
|
void mqtt_wss_destroy(mqtt_wss_client client)
|
|
{
|
|
mqtt_ng_destroy(client->mqtt);
|
|
|
|
close(client->write_notif_pipe[PIPE_WRITE_END]);
|
|
close(client->write_notif_pipe[PIPE_READ_END]);
|
|
|
|
ws_client_destroy(client->ws_client);
|
|
|
|
// deleted after client->ws_client
|
|
// as it "borrows" this pointer and might use it
|
|
if (client->target_host == client->host)
|
|
client->target_host = NULL;
|
|
if (client->target_host)
|
|
mw_free(client->target_host);
|
|
if (client->host)
|
|
mw_free(client->host);
|
|
mw_free(client->proxy_passwd);
|
|
mw_free(client->proxy_uname);
|
|
|
|
if (client->ssl)
|
|
SSL_free(client->ssl);
|
|
|
|
if (client->ssl_ctx)
|
|
SSL_CTX_free(client->ssl_ctx);
|
|
|
|
if (client->sockfd > 0)
|
|
close(client->sockfd);
|
|
|
|
pthread_mutex_destroy(&client->pub_lock);
|
|
pthread_mutex_destroy(&client->stat_lock);
|
|
|
|
mqtt_wss_log_ctx_destroy(client->log);
|
|
mw_free(client);
|
|
}
|
|
|
|
static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
|
|
{
|
|
SSL *ssl;
|
|
X509 *err_cert;
|
|
mqtt_wss_client client;
|
|
int err = 0, depth;
|
|
char *err_str;
|
|
|
|
ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
|
|
client = SSL_get_ex_data(ssl, 0);
|
|
|
|
// TODO handle depth as per https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
|
|
|
|
if (!preverify_ok) {
|
|
err = X509_STORE_CTX_get_error(ctx);
|
|
depth = X509_STORE_CTX_get_error_depth(ctx);
|
|
err_cert = X509_STORE_CTX_get_current_cert(ctx);
|
|
err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0);
|
|
|
|
mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err,
|
|
X509_verify_cert_error_string(err), depth, err_str);
|
|
|
|
mw_free(err_str);
|
|
}
|
|
|
|
if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT &&
|
|
client->ssl_flags & MQTT_WSS_SSL_ALLOW_SELF_SIGNED)
|
|
{
|
|
preverify_ok = 1;
|
|
mws_error(client->log, "Self Signed Certificate Accepted as the connection was "
|
|
"requested with MQTT_WSS_SSL_ALLOW_SELF_SIGNED");
|
|
}
|
|
|
|
return preverify_ok;
|
|
}
|
|
|
|
#define PROXY_CONNECT "CONNECT"
|
|
#define PROXY_HTTP "HTTP/1.1"
|
|
#define HTTP_ENDLINE "\x0D\x0A"
|
|
#define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A"
|
|
#define HTTP_CODE_LEN 4
|
|
#define HTTP_REASON_MAX_LEN 512
|
|
static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
|
|
{
|
|
char *ptr;
|
|
char http_code_s[4];
|
|
int http_code;
|
|
int idx;
|
|
|
|
if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) {
|
|
mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\"");
|
|
return 1;
|
|
}
|
|
|
|
rbuf_bump_tail(buf, strlen(PROXY_HTTP));
|
|
|
|
if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) {
|
|
mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\"");
|
|
return 2;
|
|
}
|
|
|
|
if (!rbuf_pop(buf, http_code_s, HTTP_CODE_LEN)) {
|
|
mws_error(client->log, "http_proxy missing HTTP code");
|
|
return 3;
|
|
}
|
|
|
|
for (int i = 0; i < HTTP_CODE_LEN - 1; i++)
|
|
if (http_code_s[i] > 0x39 || http_code_s[i] < 0x30) {
|
|
mws_error(client->log, "http_proxy HTTP code non numeric");
|
|
return 4;
|
|
}
|
|
|
|
http_code_s[HTTP_CODE_LEN - 1] = 0;
|
|
http_code = atoi(http_code_s);
|
|
|
|
// TODO check if we ever have more headers here
|
|
rbuf_find_bytes(buf, HTTP_ENDLINE, strlen(HTTP_ENDLINE), &idx);
|
|
if (idx >= HTTP_REASON_MAX_LEN) {
|
|
mws_error(client->log, "http_proxy returned reason that is too long");
|
|
return 5;
|
|
}
|
|
|
|
if (http_code != 200) {
|
|
ptr = mw_malloc(idx + 1);
|
|
if (!ptr)
|
|
return 6;
|
|
rbuf_pop(buf, ptr, idx);
|
|
ptr[idx] = 0;
|
|
|
|
mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr);
|
|
mw_free(ptr);
|
|
return 7;
|
|
}/* else
|
|
rbuf_bump_tail(buf, idx);*/
|
|
|
|
rbuf_find_bytes(buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &idx);
|
|
if (idx)
|
|
rbuf_bump_tail(buf, idx);
|
|
|
|
rbuf_bump_tail(buf, strlen(HTTP_HDR_TERMINATOR));
|
|
|
|
if (rbuf_bytes_available(buf)) {
|
|
mws_error(client->log, "http_proxy unexpected trailing bytes after end of HTTP hdr");
|
|
return 8;
|
|
}
|
|
|
|
mws_debug(client->log, "http_proxy CONNECT succeeded");
|
|
return 0;
|
|
}
|
|
|
|
#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
|
|
static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
|
|
{
|
|
EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
|
|
|
|
if (ctx != NULL) {
|
|
memset(ctx, 0, sizeof(*ctx));
|
|
}
|
|
return ctx;
|
|
}
|
|
static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
|
|
{
|
|
OPENSSL_free(ctx);
|
|
return;
|
|
}
|
|
#endif
|
|
|
|
inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
|
|
{
|
|
int len;
|
|
unsigned char *str = out;
|
|
EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
|
|
EVP_EncodeInit(ctx);
|
|
EVP_EncodeUpdate(ctx, str, outl, in, in_len);
|
|
str += *outl;
|
|
EVP_EncodeFinal(ctx, str, &len);
|
|
*outl += len;
|
|
|
|
str = out;
|
|
while(*str) {
|
|
if (*str != 0x0D && *str != 0x0A)
|
|
*out++ = *str++;
|
|
else
|
|
str++;
|
|
}
|
|
*out = 0;
|
|
|
|
EVP_ENCODE_CTX_free(ctx);
|
|
return 0;
|
|
}
|
|
|
|
static int http_proxy_connect(mqtt_wss_client client)
|
|
{
|
|
int rc;
|
|
struct pollfd poll_fd;
|
|
rbuf_t r_buf = rbuf_create(4096);
|
|
if (!r_buf)
|
|
return 1;
|
|
char *r_buf_ptr;
|
|
size_t r_buf_linear_insert_capacity;
|
|
|
|
poll_fd.fd = client->sockfd;
|
|
poll_fd.events = POLLIN;
|
|
|
|
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
|
|
snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE, PROXY_CONNECT, client->target_host, client->target_port, PROXY_HTTP);
|
|
write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
|
|
|
|
if (client->proxy_uname) {
|
|
size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2;
|
|
char *creds_plain = mw_malloc(creds_plain_len);
|
|
if (!creds_plain) {
|
|
mws_error(client->log, "OOM creds_plain");
|
|
rc = 6;
|
|
goto cleanup;
|
|
}
|
|
int creds_base64_len = (((4 * creds_plain_len / 3) + 3) & ~3);
|
|
// OpenSSL encoder puts newline every 64 output bytes
|
|
// we remove those but during encoding we need that space in the buffer
|
|
creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n");
|
|
char *creds_base64 = mw_malloc(creds_base64_len + 1);
|
|
if (!creds_base64) {
|
|
mw_free(creds_plain);
|
|
mws_error(client->log, "OOM creds_base64");
|
|
rc = 6;
|
|
goto cleanup;
|
|
}
|
|
char *ptr = creds_plain;
|
|
strcpy(ptr, client->proxy_uname);
|
|
ptr += strlen(client->proxy_uname);
|
|
*ptr++ = ':';
|
|
strcpy(ptr, client->proxy_passwd);
|
|
|
|
int b64_len;
|
|
base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain));
|
|
mw_free(creds_plain);
|
|
|
|
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
|
|
snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"Proxy-Authorization: Basic %s" HTTP_ENDLINE, creds_base64);
|
|
write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
|
|
mw_free(creds_base64);
|
|
}
|
|
write(client->sockfd, HTTP_ENDLINE, strlen(HTTP_ENDLINE));
|
|
|
|
// read until you find CRLF, CRLF (HTTP HDR end)
|
|
// or ring buffer is full
|
|
// or timeout
|
|
while ((rc = poll(&poll_fd, 1, 1000)) >= 0) {
|
|
if (!rc) {
|
|
mws_error(client->log, "http_proxy timeout waiting reply from proxy server");
|
|
rc = 2;
|
|
goto cleanup;
|
|
}
|
|
r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
|
|
if (!r_buf_ptr) {
|
|
mws_error(client->log, "http_proxy read ring buffer full");
|
|
rc = 3;
|
|
goto cleanup;
|
|
}
|
|
if ((rc = read(client->sockfd, r_buf_ptr, r_buf_linear_insert_capacity)) < 0) {
|
|
if (errno == EWOULDBLOCK || errno == EAGAIN) {
|
|
continue;
|
|
}
|
|
mws_error(client->log, "http_proxy error reading from socket \"%s\"", strerror(errno));
|
|
rc = 4;
|
|
goto cleanup;
|
|
}
|
|
rbuf_bump_head(r_buf, rc);
|
|
if (rbuf_find_bytes(r_buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &rc)) {
|
|
rc = 0;
|
|
if (http_parse_reply(client, r_buf))
|
|
rc = 5;
|
|
|
|
goto cleanup;
|
|
}
|
|
}
|
|
mws_error(client->log, "proxy negotiation poll error \"%s\"", strerror(errno));
|
|
rc = 5;
|
|
cleanup:
|
|
rbuf_free(r_buf);
|
|
return rc;
|
|
}
|
|
|
|
int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, struct mqtt_wss_proxy *proxy)
|
|
{
|
|
struct sockaddr_in addr;
|
|
memset(&addr, 0, sizeof(addr));
|
|
addr.sin_family = AF_INET;
|
|
|
|
struct hostent *he;
|
|
struct in_addr **addr_list;
|
|
|
|
if (!mqtt_params) {
|
|
mws_error(client->log, "mqtt_params can't be null!");
|
|
return -1;
|
|
}
|
|
|
|
// reset state in case this is reconnect
|
|
client->mqtt_didnt_finish_write = 0;
|
|
client->mqtt_connected = 0;
|
|
client->mqtt_disconnecting = 0;
|
|
ws_client_reset(client->ws_client);
|
|
|
|
if (client->target_host == client->host)
|
|
client->target_host = NULL;
|
|
if (client->target_host)
|
|
mw_free(client->target_host);
|
|
if (client->host)
|
|
mw_free(client->host);
|
|
|
|
if (proxy && proxy->type != MQTT_WSS_DIRECT) {
|
|
client->host = mw_strdup(proxy->host);
|
|
client->port = proxy->port;
|
|
client->target_host = mw_strdup(host);
|
|
client->target_port = port;
|
|
client->proxy_type = proxy->type;
|
|
if (proxy->username)
|
|
client->proxy_uname = mw_strdup(proxy->username);
|
|
if (proxy->password)
|
|
client->proxy_passwd = mw_strdup(proxy->password);
|
|
} else {
|
|
client->host = mw_strdup(host);
|
|
client->port = port;
|
|
client->target_host = client->host;
|
|
client->target_port = port;
|
|
}
|
|
|
|
client->ssl_flags = ssl_flags;
|
|
|
|
//TODO gethostbyname -> getaddinfo
|
|
// hstrerror -> gai_strerror
|
|
if ((he = gethostbyname(client->host)) == NULL) {
|
|
mws_error(client->log, "gethostbyname() error \"%s\"", hstrerror(h_errno));
|
|
return -1;
|
|
}
|
|
|
|
addr_list = (struct in_addr **)he->h_addr_list;
|
|
if(!addr_list[0]) {
|
|
mws_error(client->log, "No IP addr resolved");
|
|
return -1;
|
|
}
|
|
mws_debug(client->log, "Resolved IP: %s", inet_ntoa(*addr_list[0]));
|
|
addr.sin_addr = *addr_list[0];
|
|
addr.sin_port = htons(client->port);
|
|
|
|
if (client->sockfd > 0)
|
|
close(client->sockfd);
|
|
client->sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (client->sockfd < 0) {
|
|
mws_error(client->log, "Couldn't create socket()");
|
|
return -1;
|
|
}
|
|
|
|
int flag = 1;
|
|
int result = setsockopt(client->sockfd,
|
|
IPPROTO_TCP,
|
|
TCP_NODELAY,
|
|
&flag,
|
|
sizeof(int));
|
|
if (result < 0)
|
|
mws_error(client->log, "Could not dissable NAGLE");
|
|
|
|
if (connect(client->sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
|
|
mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, client->port);
|
|
return -3;
|
|
}
|
|
|
|
client->poll_fds[POLLFD_SOCKET].fd = client->sockfd;
|
|
|
|
if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) {
|
|
mws_error(client->log, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno));
|
|
return -8;
|
|
}
|
|
|
|
if (client->proxy_type != MQTT_WSS_DIRECT)
|
|
if (http_proxy_connect(client))
|
|
return -4;
|
|
|
|
#if OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
|
|
#if (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
|
|
OPENSSL_config(NULL);
|
|
#endif
|
|
SSL_load_error_strings();
|
|
SSL_library_init();
|
|
#else
|
|
if (OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, NULL) != 1) {
|
|
mws_error(client->log, "Failed to initialize SSL");
|
|
return -1;
|
|
};
|
|
#endif
|
|
|
|
// free SSL structs from possible previous connections
|
|
if (client->ssl)
|
|
SSL_free(client->ssl);
|
|
if (client->ssl_ctx)
|
|
SSL_CTX_free(client->ssl_ctx);
|
|
|
|
client->ssl_ctx = SSL_CTX_new(SSLv23_client_method());
|
|
if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) {
|
|
SSL_CTX_set_default_verify_paths(client->ssl_ctx);
|
|
SSL_CTX_set_verify(client->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback);
|
|
} else
|
|
mws_error(client->log, "SSL Certificate checking completely disabled!!!");
|
|
|
|
#ifdef MQTT_WSS_DEBUG
|
|
if(client->ssl_ctx_keylog_cb)
|
|
SSL_CTX_set_keylog_callback(client->ssl_ctx, client->ssl_ctx_keylog_cb);
|
|
#endif
|
|
|
|
client->ssl = SSL_new(client->ssl_ctx);
|
|
if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) {
|
|
if (!SSL_set_ex_data(client->ssl, 0, client)) {
|
|
mws_error(client->log, "Could not SSL_set_ex_data");
|
|
return -4;
|
|
}
|
|
}
|
|
SSL_set_fd(client->ssl, client->sockfd);
|
|
SSL_set_connect_state(client->ssl);
|
|
|
|
if (!SSL_set_tlsext_host_name(client->ssl, client->target_host)) {
|
|
mws_error(client->log, "Error setting TLS SNI host");
|
|
return -7;
|
|
}
|
|
|
|
result = SSL_connect(client->ssl);
|
|
if (result != -1 && result != 1) {
|
|
mws_error(client->log, "SSL could not connect");
|
|
return -5;
|
|
}
|
|
if (result == -1) {
|
|
int ec = SSL_get_error(client->ssl, result);
|
|
if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
|
|
mws_error(client->log, "Failed to start SSL connection");
|
|
return -6;
|
|
}
|
|
}
|
|
|
|
client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400);
|
|
|
|
mws_info(client->log, "Going to connect using internal MQTT 5 implementation");
|
|
struct mqtt_auth_properties auth;
|
|
auth.client_id = (char*)mqtt_params->clientid;
|
|
auth.client_id_free = NULL;
|
|
auth.username = (char*)mqtt_params->username;
|
|
auth.username_free = NULL;
|
|
auth.password = (char*)mqtt_params->password;
|
|
auth.password_free = NULL;
|
|
struct mqtt_lwt_properties lwt;
|
|
lwt.will_topic = (char*)mqtt_params->will_topic;
|
|
lwt.will_topic_free = NULL;
|
|
lwt.will_message = (void*)mqtt_params->will_msg;
|
|
lwt.will_message_free = NULL; // TODO expose no copy version to API
|
|
lwt.will_message_size = mqtt_params->will_msg_len;
|
|
lwt.will_qos = (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK);
|
|
lwt.will_retain = mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN;
|
|
int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive);
|
|
if (ret) {
|
|
mws_error(client->log, "Error generating MQTT connect");
|
|
return 1;
|
|
}
|
|
|
|
client->poll_fds[POLLFD_PIPE].events = POLLIN;
|
|
client->poll_fds[POLLFD_SOCKET].events = POLLIN;
|
|
// wait till MQTT connection is established
|
|
while (!client->mqtt_connected) {
|
|
if(mqtt_wss_service(client, -1)) {
|
|
mws_error(client->log, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port);
|
|
return 2;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
#define NSEC_PER_USEC 1000ULL
|
|
#define USEC_PER_SEC 1000000ULL
|
|
#define NSEC_PER_MSEC 1000000ULL
|
|
#define NSEC_PER_SEC 1000000000ULL
|
|
|
|
static inline uint64_t boottime_usec(mqtt_wss_client client) {
|
|
struct timespec ts;
|
|
#if defined(__APPLE__) || defined(__FreeBSD__)
|
|
if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
|
|
#else
|
|
if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) {
|
|
#endif
|
|
mws_error(client->log, "clock_gettimte failed");
|
|
return 0;
|
|
}
|
|
return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
|
|
}
|
|
|
|
#define MWS_TIMED_OUT 1
|
|
#define MWS_ERROR 2
|
|
#define MWS_OK 0
|
|
static inline const char *mqtt_wss_error_tos(int ec)
|
|
{
|
|
switch(ec) {
|
|
case MWS_TIMED_OUT:
|
|
return "Error: Operation was not able to finish in time";
|
|
case MWS_ERROR:
|
|
return "Unspecified Error";
|
|
default:
|
|
return "Unknown Error Code!";
|
|
}
|
|
|
|
}
|
|
|
|
static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
|
|
{
|
|
uint64_t exit_by = boottime_usec(client) + (timeout_ms * NSEC_PER_MSEC);
|
|
uint64_t now;
|
|
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT
|
|
while (rbuf_bytes_available(client->ws_client->buf_write)) {
|
|
now = boottime_usec(client);
|
|
if (now >= exit_by)
|
|
return MWS_TIMED_OUT;
|
|
if (mqtt_wss_service(client, exit_by - now))
|
|
return MWS_ERROR;
|
|
}
|
|
return MWS_OK;
|
|
}
|
|
|
|
void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
|
|
{
|
|
int ret;
|
|
|
|
// block application from sending more MQTT messages
|
|
client->mqtt_disconnecting = 1;
|
|
|
|
// send whatever was left at the time of calling this function
|
|
ret = mqtt_wss_service_all(client, timeout_ms / 4);
|
|
if(ret)
|
|
mws_error(client->log,
|
|
"Error while trying to send all remaining data in an attempt "
|
|
"to gracefully disconnect! EC=%d Desc:\"%s\"",
|
|
ret,
|
|
mqtt_wss_error_tos(ret));
|
|
|
|
// schedule and send MQTT disconnect
|
|
mqtt_ng_disconnect(client->mqtt, 0);
|
|
mqtt_ng_sync(client->mqtt);
|
|
|
|
ret = mqtt_wss_service_all(client, timeout_ms / 4);
|
|
if(ret)
|
|
mws_error(client->log,
|
|
"Error while trying to send MQTT disconnect message in an attempt "
|
|
"to gracefully disconnect! EC=%d Desc:\"%s\"",
|
|
ret,
|
|
mqtt_wss_error_tos(ret));
|
|
|
|
// send WebSockets close message
|
|
uint16_t ws_rc = htobe16(1000);
|
|
ws_client_send(client->ws_client, WS_OP_CONNECTION_CLOSE, (const char*)&ws_rc, sizeof(ws_rc));
|
|
ret = mqtt_wss_service_all(client, timeout_ms / 4);
|
|
if(ret) {
|
|
// Some MQTT/WSS servers will close socket on receipt of MQTT disconnect and
|
|
// do not wait for WebSocket to be closed properly
|
|
mws_warn(client->log,
|
|
"Error while trying to send WebSocket disconnect message in an attempt "
|
|
"to gracefully disconnect! EC=%d Desc:\"%s\".",
|
|
ret,
|
|
mqtt_wss_error_tos(ret));
|
|
}
|
|
|
|
// Service WSS connection until remote closes connection (usual)
|
|
// or timeout happens (unusual) in which case we close
|
|
mqtt_wss_service_all(client, timeout_ms / 4);
|
|
|
|
close(client->sockfd);
|
|
client->sockfd = -1;
|
|
}
|
|
|
|
static inline void mqtt_wss_wakeup(mqtt_wss_client client)
|
|
{
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "mqtt_wss_wakup - forcing wake up of main loop");
|
|
#endif
|
|
write(client->write_notif_pipe[PIPE_WRITE_END], " ", 1);
|
|
}
|
|
|
|
#define THROWAWAY_BUF_SIZE 32
|
|
char throwaway[THROWAWAY_BUF_SIZE];
|
|
static inline void util_clear_pipe(int fd)
|
|
{
|
|
(void)read(fd, throwaway, THROWAWAY_BUF_SIZE);
|
|
}
|
|
|
|
static inline void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) {
|
|
if (ssl_ret == SSL_ERROR_WANT_WRITE)
|
|
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
|
|
if (ssl_ret == SSL_ERROR_WANT_READ)
|
|
client->poll_fds[POLLFD_SOCKET].events |= POLLIN;
|
|
}
|
|
|
|
static int handle_mqtt_internal(mqtt_wss_client client)
|
|
{
|
|
int rc = mqtt_ng_sync(client->mqtt);
|
|
if (rc) {
|
|
mws_error(client->log, "mqtt_ng_sync returned %d != 0", rc);
|
|
client->mqtt_connected = 0;
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
#define SEC_TO_MSEC 1000
|
|
static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client)
|
|
{
|
|
time_t last_send = mqtt_ng_last_send_time(client->mqtt);
|
|
long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC)
|
|
+ (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */));
|
|
return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC));
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) {
|
|
struct timespec ts;
|
|
if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
|
|
mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed.");
|
|
return 0;
|
|
}
|
|
return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
|
|
}
|
|
#endif
|
|
|
|
int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
|
|
{
|
|
char *ptr;
|
|
size_t size;
|
|
int ret;
|
|
int send_keepalive = 0;
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
uint64_t t1,t2;
|
|
t1 = mqtt_wss_now_usec(client);
|
|
#endif
|
|
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<");
|
|
mws_debug(client->log, "Waiting for events: %s%s%s",
|
|
(client->poll_fds[POLLFD_SOCKET].events & POLLIN) ? "SOCKET_POLLIN " : "",
|
|
(client->poll_fds[POLLFD_SOCKET].events & POLLOUT) ? "SOCKET_POLLOUT " : "",
|
|
(client->poll_fds[POLLFD_PIPE].events & POLLIN) ? "PIPE_POLLIN" : "" );
|
|
#endif
|
|
|
|
// Check user requested TO doesn't interfere with MQTT keep alives
|
|
long long int till_next_keep_alive = t_till_next_keepalive_ms(client);
|
|
if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Shortening Timeout requested %d to %lld to ensure keep-alive can be sent", timeout_ms, till_next_keep_alive);
|
|
#endif
|
|
timeout_ms = till_next_keep_alive;
|
|
send_keepalive = 1;
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t2 = mqtt_wss_now_usec(client);
|
|
client->stats.time_keepalive += t2 - t1;
|
|
#endif
|
|
|
|
if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
|
|
if (errno == EINTR) {
|
|
mws_warn(client->log, "poll interrupted by EINTR");
|
|
return 0;
|
|
}
|
|
mws_error(client->log, "poll error \"%s\"", strerror(errno));
|
|
return -2;
|
|
}
|
|
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Poll events happened: %s%s%s%s",
|
|
(client->poll_fds[POLLFD_SOCKET].revents & POLLIN) ? "SOCKET_POLLIN " : "",
|
|
(client->poll_fds[POLLFD_SOCKET].revents & POLLOUT) ? "SOCKET_POLLOUT " : "",
|
|
(client->poll_fds[POLLFD_PIPE].revents & POLLIN) ? "PIPE_POLLIN " : "",
|
|
(!ret) ? "POLL_TIMEOUT" : "");
|
|
#endif
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t1 = mqtt_wss_now_usec(client);
|
|
#endif
|
|
|
|
if (ret == 0) {
|
|
if (send_keepalive) {
|
|
// otherwise we shortened the timeout ourselves to take care of
|
|
// MQTT keep alives
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Forcing MQTT Ping/keep-alive");
|
|
#endif
|
|
mqtt_ng_ping(client->mqtt);
|
|
} else {
|
|
// if poll timed out and user requested timeout was being used
|
|
// return here let user do his work and he will call us back soon
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t2 = mqtt_wss_now_usec(client);
|
|
client->stats.time_keepalive += t2 - t1;
|
|
#endif
|
|
|
|
client->poll_fds[POLLFD_SOCKET].events = 0;
|
|
|
|
if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
|
|
if((ret = SSL_read(client->ssl, ptr, size)) > 0) {
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "SSL_Read: Read %d.", ret);
|
|
#endif
|
|
pthread_mutex_lock(&client->stat_lock);
|
|
client->stats.bytes_rx += ret;
|
|
pthread_mutex_unlock(&client->stat_lock);
|
|
rbuf_bump_head(client->ws_client->buf_read, ret);
|
|
} else {
|
|
int errnobkp = errno;
|
|
ret = SSL_get_error(client->ssl, ret);
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Read Err: %s", util_openssl_ret_err(ret));
|
|
#endif
|
|
set_socket_pollfds(client, ret);
|
|
if (ret != SSL_ERROR_WANT_READ &&
|
|
ret != SSL_ERROR_WANT_WRITE) {
|
|
mws_error(client->log, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret));
|
|
if (ret == SSL_ERROR_SYSCALL)
|
|
mws_error(client->log, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
|
|
return MQTT_WSS_ERR_CONN_DROP;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t1 = mqtt_wss_now_usec(client);
|
|
client->stats.time_read_socket += t1 - t2;
|
|
#endif
|
|
|
|
ret = ws_client_process(client->ws_client);
|
|
switch(ret) {
|
|
case WS_CLIENT_PROTOCOL_ERROR:
|
|
return MQTT_WSS_ERR_PROTO_WS;
|
|
case WS_CLIENT_NEED_MORE_BYTES:
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "WSCLIENT WANT READ");
|
|
#endif
|
|
client->poll_fds[POLLFD_SOCKET].events |= POLLIN;
|
|
break;
|
|
case WS_CLIENT_CONNECTION_CLOSED:
|
|
return MQTT_WSS_ERR_CONN_DROP;
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t2 = mqtt_wss_now_usec(client);
|
|
client->stats.time_process_websocket += t2 - t1;
|
|
#endif
|
|
|
|
// process MQTT stuff
|
|
if(client->ws_client->state == WS_ESTABLISHED)
|
|
if (handle_mqtt_internal(client))
|
|
return MQTT_WSS_ERR_PROTO_MQTT;
|
|
|
|
if (client->mqtt_didnt_finish_write) {
|
|
client->mqtt_didnt_finish_write = 0;
|
|
client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
|
|
}
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t1 = mqtt_wss_now_usec(client);
|
|
client->stats.time_process_mqtt += t1 - t2;
|
|
#endif
|
|
|
|
if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Have data to write to SSL");
|
|
#endif
|
|
if ((ret = SSL_write(client->ssl, ptr, size)) > 0) {
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size);
|
|
#endif
|
|
pthread_mutex_lock(&client->stat_lock);
|
|
client->stats.bytes_tx += ret;
|
|
pthread_mutex_unlock(&client->stat_lock);
|
|
rbuf_bump_tail(client->ws_client->buf_write, ret);
|
|
} else {
|
|
int errnobkp = errno;
|
|
ret = SSL_get_error(client->ssl, ret);
|
|
#ifdef DEBUG_ULTRA_VERBOSE
|
|
mws_debug(client->log, "Write Err: %s", util_openssl_ret_err(ret));
|
|
#endif
|
|
set_socket_pollfds(client, ret);
|
|
if (ret != SSL_ERROR_WANT_READ &&
|
|
ret != SSL_ERROR_WANT_WRITE) {
|
|
mws_error(client->log, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret));
|
|
if (ret == SSL_ERROR_SYSCALL)
|
|
mws_error(client->log, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
|
|
return MQTT_WSS_ERR_CONN_DROP;
|
|
}
|
|
}
|
|
}
|
|
|
|
if(client->poll_fds[POLLFD_PIPE].revents & POLLIN)
|
|
util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
|
|
|
|
#ifdef MQTT_WSS_CPUSTATS
|
|
t2 = mqtt_wss_now_usec(client);
|
|
client->stats.time_write_socket += t2 - t1;
|
|
#endif
|
|
|
|
return MQTT_WSS_OK;
|
|
}
|
|
|
|
int mqtt_wss_publish5(mqtt_wss_client client,
|
|
char *topic,
|
|
free_fnc_t topic_free,
|
|
void *msg,
|
|
free_fnc_t msg_free,
|
|
size_t msg_len,
|
|
uint8_t publish_flags,
|
|
uint16_t *packet_id)
|
|
{
|
|
if (client->mqtt_disconnecting) {
|
|
mws_error(client->log, "mqtt_wss is disconnecting can't publish");
|
|
return 1;
|
|
}
|
|
|
|
if (!client->mqtt_connected) {
|
|
mws_error(client->log, "MQTT is offline. Can't send message.");
|
|
return 1;
|
|
}
|
|
uint8_t mqtt_flags = 0;
|
|
|
|
mqtt_flags = (publish_flags & MQTT_WSS_PUB_QOSMASK) << 1;
|
|
if (publish_flags & MQTT_WSS_PUB_RETAIN)
|
|
mqtt_flags |= MQTT_PUBLISH_RETAIN;
|
|
|
|
int rc = mqtt_ng_publish(client->mqtt, topic, topic_free, msg, msg_free, msg_len, mqtt_flags, packet_id);
|
|
if (rc == MQTT_NG_MSGGEN_MSG_TOO_BIG)
|
|
return MQTT_WSS_ERR_TOO_BIG_FOR_SERVER;
|
|
|
|
mqtt_wss_wakeup(client);
|
|
|
|
return rc;
|
|
}
|
|
|
|
int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level)
|
|
{
|
|
(void)max_qos_level; //TODO now hardcoded
|
|
if (!client->mqtt_connected) {
|
|
mws_error(client->log, "MQTT is offline. Can't subscribe.");
|
|
return 1;
|
|
}
|
|
|
|
if (client->mqtt_disconnecting) {
|
|
mws_error(client->log, "mqtt_wss is disconnecting can't subscribe");
|
|
return 1;
|
|
}
|
|
|
|
struct mqtt_sub sub = {
|
|
.topic = topic,
|
|
.topic_free = NULL,
|
|
.options = /* max_qos_level & 0x3 TODO when QOS > 1 implemented */ 0x01 | (0x01 << 3)
|
|
};
|
|
mqtt_ng_subscribe(client->mqtt, &sub, 1);
|
|
|
|
mqtt_wss_wakeup(client);
|
|
return 0;
|
|
}
|
|
|
|
struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client)
|
|
{
|
|
struct mqtt_wss_stats current;
|
|
pthread_mutex_lock(&client->stat_lock);
|
|
current = client->stats;
|
|
memset(&client->stats, 0, sizeof(client->stats));
|
|
pthread_mutex_unlock(&client->stat_lock);
|
|
mqtt_ng_get_stats(client->mqtt, ¤t.mqtt);
|
|
return current;
|
|
}
|
|
|
|
int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic)
|
|
{
|
|
return mqtt_ng_set_topic_alias(client->mqtt, topic);
|
|
}
|
|
|
|
#ifdef MQTT_WSS_DEBUG
|
|
void mqtt_wss_set_SSL_CTX_keylog_cb(mqtt_wss_client client, void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line))
|
|
{
|
|
client->ssl_ctx_keylog_cb = ssl_ctx_keylog_cb;
|
|
}
|
|
#endif
|