0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-14 01:29:11 +00:00

WEBRTC for communication between agents and browsers ()

* initial webrtc setup

* missing files

* rewrite of webrtc integration

* initialization and cleanup of webrtc connections

* make it compile without libdatachannel

* add missing webrtc_initialize() function when webrtc is not enabled

* make c++17 optional

* add build/m4/ax_compiler_vendor.m4

* add ax_cxx_compile_stdcxx.m4

* added new m4 files to makefile.am

* id all webrtc connections

* show warning when webrtc is disabled

* fixed message

* moved all webrtc error checking inside webrtc.cpp

* working webrtc connection establishment and cleanup

* remove obsolete code

* rewrote webrtc code in C to remove dependency for c++17

* fixed left-over reference

* detect binary and text messages

* minor fix

* naming of webrtc threads

* added webrtc configuration

* fix for thread_get_name_np()

* smaller web_client memory footprint

* universal web clients cache

* free web clients every 100 uses

* webrtc is now enabled by default only when compiled with internal checks

* webrtc responses to /api/ requests, including LZ4 compression

* fix for binary and text messages

* web_client_cache is now global

* unification of the internal web server API, for web requests, aclk request, webrtc requests

* more cleanup and unification of web client timings

* fixed compiler warnings

* update sent and received bytes

* eliminated of almost all big buffers in web client

* registry now uses the new json generation

* cookies are now an array; fixed redirects

* fix redirects, again

* write cookies directly to the header buffer, eliminating the need for cookie structures in web client

* reset the has_cookies flag

* gathered all web client cleanup to one function

* fixes redirects

* added summary.globals in /api/v2/data response

* ars to arc in /api/v2/data

* properly handle host impersonation

* set the context of mem.numa_nodes
This commit is contained in:
Costa Tsaousis 2023-04-20 20:49:06 +03:00 committed by GitHub
parent 5b676d5f91
commit c3d70ffcb4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 3120 additions and 1270 deletions

View file

@ -863,6 +863,8 @@ set(API_PLUGIN_FILES
web/api/formatters/rrdset2json.c
web/api/formatters/rrdset2json.h
web/api/health/health_cmdapi.c
web/rtc/webrtc.c
web/rtc/webrtc.h
)
set(STREAMING_PLUGIN_FILES

View file

@ -37,6 +37,8 @@ EXTRA_DIST = \
build/m4/ax_c_mallopt.m4 \
build/m4/tcmalloc.m4 \
build/m4/ax_c__generic.m4 \
build/m4/ax_compiler_vendor.m4 \
build/m4/ax_cxx_compile_stdcxx.m4 \
ml/dlib \
README.md \
LICENSE \
@ -119,6 +121,7 @@ AM_CFLAGS = \
$(OPTIONAL_UUID_CFLAGS) \
$(OPTIONAL_MQTT_CFLAGS) \
$(OPTIONAL_LIBCAP_LIBS) \
$(OPTIONAL_DATACHANNEL_CFLAGS) \
$(OPTIONAL_IPMIMONITORING_CFLAGS) \
$(OPTIONAL_CUPS_CFLAGS) \
$(OPTIONAL_XENSTAT_CFLAGS) \
@ -659,6 +662,8 @@ STATSD_PLUGIN_FILES = \
$(NULL)
WEB_PLUGIN_FILES = \
web/rtc/webrtc.c \
web/rtc/webrtc.h \
web/server/web_client.c \
web/server/web_client.h \
web/server/web_server.c \
@ -980,6 +985,7 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_MQTT_LIBS) \
$(OPTIONAL_UV_LIBS) \
$(OPTIONAL_LZ4_LIBS) \
$(OPTIONAL_DATACHANNEL_LIBS) \
libjudy.a \
$(OPTIONAL_SSL_LIBS) \
$(OPTIONAL_JSONC_LIBS) \

View file

@ -3,6 +3,7 @@
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_tx_msgs.h"
#include "../../web/server/web_client_cache.h"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024)
@ -12,16 +13,46 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
static usec_t aclk_web_api_request(RRDHOST *host, struct web_client *w, char *url, const size_t api_version)
{
static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) {
int retval = 0;
BUFFER *local_buffer = NULL;
size_t size = 0;
size_t sent = 0;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
char *start, *end;
#endif
struct web_client *w = web_client_get_from_cache();
w->acl = WEB_CLIENT_ACL_ACLK;
w->mode = WEB_CLIENT_MODE_GET;
w->timings.tv_in = query->created_tv;
usec_t t;
web_client_timeout_checkpoint_set(w, query->timeout);
if(web_client_timeout_checkpoint_and_check(w, &t)) {
log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout);
retval = 1;
w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
goto cleanup;
}
t = now_monotonic_high_precision_usec();
web_client_decode_path_and_query_string(w, query->data.http_api_v2.query);
char *path = (char *)buffer_tostring(w->url_path_decoded);
if(api_version == 2)
w->response.code = web_client_api_request_v2(host, w, url);
else
w->response.code = web_client_api_request_v1(host, w, url);
if (aclk_stats_enabled) {
char *url_path_endpoint = strrchr(path, '/');
ACLK_STATS_LOCK;
int stat_idx = aclk_cloud_req_http_type_to_idx(url_path_endpoint ? url_path_endpoint + 1 : "other");
aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++;
ACLK_STATS_UNLOCK;
}
w->response.code = web_client_api_request_with_node_selection(localhost, w, path);
web_client_timeout_checkpoint_response_ready(w, &t);
if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) {
buffer_flush(w->response.data);
@ -30,8 +61,6 @@ static usec_t aclk_web_api_request(RRDHOST *host, struct web_client *w, char *ur
w->response.code = HTTP_RESP_CONTENT_TOO_LONG;
}
t = now_monotonic_high_precision_usec() - t;
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_q_process_total += t;
@ -41,126 +70,7 @@ static usec_t aclk_web_api_request(RRDHOST *host, struct web_client *w, char *ur
ACLK_STATS_UNLOCK;
}
return t;
}
static RRDHOST *node_id_2_rrdhost(const char *node_id)
{
int res;
uuid_t node_id_bin, host_id_bin;
RRDHOST *host = find_host_by_node_id((char *)node_id);
if (host)
return host;
char host_id[UUID_STR_LEN];
if (uuid_parse(node_id, node_id_bin)) {
error("Couldn't parse UUID %s", node_id);
return NULL;
}
if ((res = get_host_id(&node_id_bin, &host_id_bin))) {
error("node not found rc=%d", res);
return NULL;
}
uuid_unparse_lower(host_id_bin, host_id);
return rrdhost_find_by_guid(host_id);
}
#define NODE_ID_QUERY "/node/"
// TODO this function should be quarantied and written nicely
// lots of skeletons from initial ACLK Legacy impl.
// quick and dirty from the start
static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
{
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
BUFFER *log_buffer = buffer_create(NETDATA_WEB_REQUEST_URL_SIZE, &netdata_buffers_statistics.buffers_aclk);
RRDHOST *query_host = localhost;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
char *start, *end;
#endif
struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = WEB_CLIENT_ACL_ACLK;
buffer_strcat(log_buffer, query->data.http_api_v2.query);
size_t size = 0;
size_t sent = 0;
w->tv_in = query->created_tv;
now_realtime_timeval(&w->tv_ready);
if (query->timeout) {
int in_queue = (int) (dt_usec(&w->tv_in, &w->tv_ready) / 1000);
if (in_queue > query->timeout) {
log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %d ms (LIMIT %d ms)", in_queue, query->timeout);
retval = 1;
w->response.code = HTTP_RESP_BACKEND_FETCH_FAILED;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
goto cleanup;
}
}
if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
char nodeid[UUID_STR_LEN];
if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
retval = 1;
w->response.code = 404;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_MALFORMED_NODE_ID, CLOUD_EMSG_MALFORMED_NODE_ID, NULL, 0);
goto cleanup;
}
strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
query_host = node_id_2_rrdhost(nodeid);
if (!query_host) {
error_report("Host with node_id \"%s\" not found! Returning 404 to Cloud!", nodeid);
retval = 1;
w->response.code = 404;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_NODE_NOT_FOUND, CLOUD_EMSG_NODE_NOT_FOUND, NULL, 0);
goto cleanup;
}
}
size_t api_version = 1;
{
char *s = strstr(query->data.http_api_v2.query, "/api/v");
if(s && s[6]) {
api_version = str2u(&s[6]);
if(api_version != 1 && api_version != 2)
api_version = 1;
}
}
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
*mysep = '\0';
} else
url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
mysep = strrchr(query->data.http_api_v2.query, '/');
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
int stat_idx = aclk_cloud_req_http_type_to_idx(mysep ? mysep + 1 : "other");
aclk_metrics_per_sample.cloud_req_http_by_type[stat_idx]++;
ACLK_STATS_UNLOCK;
}
// execute the query
t = aclk_web_api_request(query_host, w, mysep ? mysep + 1 : "noop", api_version);
size = (w->mode == WEB_CLIENT_MODE_FILECOPY) ? w->response.rlen : w->response.data->len;
size = w->response.data->len;
sent = size;
#ifdef NETDATA_WITH_ZLIB
@ -175,8 +85,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->response.zstream.zfree = Z_NULL;
w->response.zstream.opaque = Z_NULL;
if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
w->response.zinitialized = 1;
w->response.zoutput = 1;
w->response.zinitialized = true;
w->response.zoutput = true;
} else
error("Failed to initialize zlib. Proceeding without compression.");
}
@ -212,7 +122,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
#endif
w->response.data->date = w->tv_ready.tv_sec;
w->response.data->date = w->timings.tv_ready.tv_sec;
web_client_build_http_header(w);
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_aclk);
local_buffer->content_type = CT_APPLICATION_JSON;
@ -240,7 +150,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
struct timeval tv;
cleanup:
now_realtime_timeval(&tv);
now_monotonic_high_precision_timeval(&tv);
log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
w->id
, gettid()
@ -249,24 +159,19 @@ cleanup:
, sent
, size
, size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0)
, dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
, dt_usec(&tv, &w->tv_ready) / 1000.0
, dt_usec(&tv, &w->tv_in) / 1000.0
, dt_usec(&w->timings.tv_ready, &w->timings.tv_in) / 1000.0
, dt_usec(&tv, &w->timings.tv_ready) / 1000.0
, dt_usec(&tv, &w->timings.tv_in) / 1000.0
, w->response.code
, strip_control_characters((char *)buffer_tostring(log_buffer))
, strip_control_characters((char *)buffer_tostring(w->url_as_received))
);
web_client_release_to_cache(w);
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
deflateEnd(&w->response.zstream);
buffer_free(z_buffer);
#endif
buffer_free(w->response.data);
buffer_free(w->response.header);
buffer_free(w->response.header_output);
freez(w);
buffer_free(local_buffer);
buffer_free(log_buffer);
return retval;
}

View file

@ -20,7 +20,7 @@ static struct aclk_query_queue {
static inline int _aclk_queue_query(aclk_query_t query)
{
now_realtime_timeval(&query->created_tv);
now_monotonic_high_precision_timeval(&query->created_tv);
query->created = now_realtime_usec();
ACLK_QUEUE_LOCK;

View file

@ -0,0 +1,119 @@
# ===========================================================================
# https://www.gnu.org/software/autoconf-archive/ax_compiler_vendor.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_COMPILER_VENDOR
#
# DESCRIPTION
#
# Determine the vendor of the C, C++ or Fortran compiler. The vendor is
# returned in the cache variable $ax_cv_c_compiler_vendor for C,
# $ax_cv_cxx_compiler_vendor for C++ or $ax_cv_fc_compiler_vendor for
# (modern) Fortran. The value is one of "intel", "ibm", "pathscale",
# "clang" (LLVM), "cray", "fujitsu", "sdcc", "sx", "nvhpc" (NVIDIA HPC
# Compiler), "portland" (PGI), "gnu" (GCC), "sun" (Oracle Developer
# Studio), "hp", "dec", "borland", "comeau", "kai", "lcc", "sgi",
# "microsoft", "metrowerks", "watcom", "tcc" (Tiny CC) or "unknown" (if
# the compiler cannot be determined).
#
# To check for a Fortran compiler, you must first call AC_FC_PP_SRCEXT
# with an appropriate preprocessor-enabled extension. For example:
#
# AC_LANG_PUSH([Fortran])
# AC_PROG_FC
# AC_FC_PP_SRCEXT([F])
# AX_COMPILER_VENDOR
# AC_LANG_POP([Fortran])
#
# LICENSE
#
# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu>
# Copyright (c) 2008 Matteo Frigo
# Copyright (c) 2018-19 John Zaitseff <J.Zaitseff@zap.org.au>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
#
# As a special exception, the respective Autoconf Macro's copyright owner
# gives unlimited permission to copy, distribute and modify the configure
# scripts that are the output of Autoconf when processing the Macro. You
# need not follow the terms of the GNU General Public License when using
# or distributing such scripts, even though portions of the text of the
# Macro appear in them. The GNU General Public License (GPL) does govern
# all other use of the material that constitutes the Autoconf Macro.
#
# This special exception to the GPL applies to versions of the Autoconf
# Macro released by the Autoconf Archive. When you make and distribute a
# modified version of the Autoconf Macro, you may extend this special
# exception to the GPL to apply to your modified version as well.
#serial 32
AC_DEFUN([AX_COMPILER_VENDOR], [dnl
AC_CACHE_CHECK([for _AC_LANG compiler vendor], ax_cv_[]_AC_LANG_ABBREV[]_compiler_vendor, [dnl
dnl If you modify this list of vendors, please add similar support
dnl to ax_compiler_version.m4 if at all possible.
dnl
dnl Note: Do NOT check for GCC first since some other compilers
dnl define __GNUC__ to remain compatible with it. Compilers that
dnl are very slow to start (such as Intel) are listed first.
vendors="
intel: __ICC,__ECC,__INTEL_COMPILER
ibm: __xlc__,__xlC__,__IBMC__,__IBMCPP__,__ibmxl__
pathscale: __PATHCC__,__PATHSCALE__
clang: __clang__
cray: _CRAYC
fujitsu: __FUJITSU
sdcc: SDCC,__SDCC
sx: _SX
nvhpc: __NVCOMPILER
portland: __PGI
gnu: __GNUC__
sun: __SUNPRO_C,__SUNPRO_CC,__SUNPRO_F90,__SUNPRO_F95
hp: __HP_cc,__HP_aCC
dec: __DECC,__DECCXX,__DECC_VER,__DECCXX_VER
borland: __BORLANDC__,__CODEGEARC__,__TURBOC__
comeau: __COMO__
kai: __KCC
lcc: __LCC__
sgi: __sgi,sgi
microsoft: _MSC_VER
metrowerks: __MWERKS__
watcom: __WATCOMC__
tcc: __TINYC__
unknown: UNKNOWN
"
for ventest in $vendors; do
case $ventest in
*:)
vendor=$ventest
continue
;;
*)
vencpp="defined("`echo $ventest | sed 's/,/) || defined(/g'`")"
;;
esac
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([], [[
#if !($vencpp)
thisisanerror;
#endif
]])], [break])
done
ax_cv_[]_AC_LANG_ABBREV[]_compiler_vendor=`echo $vendor | cut -d: -f1`
])
])dnl

File diff suppressed because it is too large Load diff

View file

@ -1771,11 +1771,11 @@ static inline void substitute_dots_in_id(char *s) {
char *cgroup_parse_resolved_name_and_labels(DICTIONARY *labels, char *data) {
// the first word, up to the first space is the name
char *name = mystrsep(&data, " ");
char *name = strsep_skip_consecutive_separators(&data, " ");
// the rest are key=value pairs separated by comma
while(data) {
char *pair = mystrsep(&data, ",");
char *pair = strsep_skip_consecutive_separators(&data, ",");
rrdlabels_add_pair(labels, pair, RRDLABEL_SRC_AUTO| RRDLABEL_SRC_K8S);
}

View file

@ -105,7 +105,7 @@ int do_proc_sys_devices_system_node(int update_every, usec_t dt) {
, m->name
, NULL
, "numa"
, NULL
, "mem.numa_nodes"
, "NUMA events"
, "events/s"
, PLUGIN_PROC_NAME

View file

@ -135,9 +135,9 @@ static inline int collect_btrfs_error_stats(BTRFS_DEVICE *device){
char *p = buffer;
while(p){
char *val = mystrsep(&p, "\n");
char *val = strsep_skip_consecutive_separators(&p, "\n");
if(unlikely(!val || !*val)) break;
char *key = mystrsep(&val, " ");
char *key = strsep_skip_consecutive_separators(&val, " ");
if(!strcmp(key, "write_errs")) device->write_errs = str2ull(val, NULL);
else if(!strcmp(key, "read_errs")) device->read_errs = str2ull(val, NULL);
@ -166,9 +166,9 @@ static inline int collect_btrfs_commits_stats(BTRFS_NODE *node, int update_every
char *p = buffer;
while(p){
char *val = mystrsep(&p, "\n");
char *val = strsep_skip_consecutive_separators(&p, "\n");
if(unlikely(!val || !*val)) break;
char *key = mystrsep(&val, " ");
char *key = strsep_skip_consecutive_separators(&val, " ");
if(!strcmp(key, "commits")){
long long commits_total_new = str2ull(val, NULL);

View file

@ -233,6 +233,55 @@ if test "${enable_cloud}" = "no"; then
AC_DEFINE([DISABLE_CLOUD], [1], [disable netdata cloud functionality])
fi
# -----------------------------------------------------------------------------
# C++ version check
# Check for C++17 support (optional)
# AX_CXX_COMPILE_STDCXX(17, noext, optional)
if test "x$HAVE_CXX17" != "x1"; then
# Check for C++11 support (optional)
AX_CXX_COMPILE_STDCXX(11, noext, optional)
fi
AC_MSG_CHECKING([c++ standard to use])
if test "x$HAVE_CXX17" = "x1"; then
have_cxx17="yes"
have_cxx11="yes"
CPP_STD_FLAG="-std=c++17"
cpp_std_to_use="c++17"
AM_CONDITIONAL([HAVE_CXX17], [true])
AM_CONDITIONAL([HAVE_CXX11], [true])
elif test "x$HAVE_CXX11" = "x1"; then
have_cxx17="no"
have_cxx11="yes"
CPP_STD_FLAG="-std=c++11"
cpp_std_to_use="c++11"
AM_CONDITIONAL([HAVE_CXX17], [false])
AM_CONDITIONAL([HAVE_CXX11], [true])
else
have_cxx17="no"
have_cxx11="no"
CPP_STD_FLAG=""
cpp_std_to_use="no c++"
AM_CONDITIONAL([HAVE_CXX17], [false])
AM_CONDITIONAL([HAVE_CXX11], [false])
fi
# PPC64LE needs -std=gnu++11 in order to build dlib. However, the rest of
# the agent's components use and have been tested only with -std=c++11.
# Skip ML compilation on that CPU until we reorganize and test the C++ flags.
if test "${host_cpu}" = "powerpc64le"; then
have_cxx17="no"
have_cxx11="no"
CPP_STD_FLAG=""
cpp_std_to_use="no c++ on powerpc64le"
AM_CONDITIONAL([HAVE_CXX17], [false])
AM_CONDITIONAL([HAVE_CXX11], [false])
fi
AC_MSG_RESULT([${cpp_std_to_use}])
# -----------------------------------------------------------------------------
# netdata required checks
@ -347,6 +396,17 @@ AC_CHECK_LIB(
[AC_DEFINE([HAVE_PTHREAD_GETNAME_NP], [1], [Is set if pthread_getname_np is available])]
)
# -----------------------------------------------------------------------------
# libdatachannel
AC_CHECK_LIB([datachannel], [rtcCreatePeerConnection],
[LIBDATACHANNEL_FOUND=yes],
[LIBDATACHANNEL_FOUND=no])
if test "x$LIBDATACHANNEL_FOUND" = "xyes"; then
AC_DEFINE([HAVE_LIBDATACHANNEL], [1], [libdatachannel usability])
OPTIONAL_DATACHANNEL_LIBS="-ldatachannel"
fi
# -----------------------------------------------------------------------------
# libm
@ -858,7 +918,6 @@ if test "$enable_cloud" != "no"; then
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
OPTIONAL_ACLK_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/aclk/aclk-schemas"
OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}"
CXX11FLAG="-std=c++11"
OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}"
fi
fi
@ -1195,22 +1254,8 @@ if test "${enable_ml}" = "yes" -a "${have_ml_submodules}" = "no"; then
AC_MSG_ERROR([You have explicitly requested --enable-ml functionality but it cannot be built because the required git submodules are missing.])
fi
# Check if C++ toolchain does not support C++11. Fail if ML was explicitly requested.
AC_LANG_PUSH([C++])
AX_CHECK_COMPILE_FLAG([-std=c++11], [have_cxx11=yes], [have_cxx11=no])
AC_LANG_POP([C++])
# PPC64LE needs -std=gnu++11 in order to build dlib. However, the rest of
# the agent's components use and have been tested only with -std=c++11.
# Skip ML compilation on that CPU until we reorganize and test the C++ flags.
if test "${host_cpu}" = "powerpc64le"; then
have_cxx11="no"
fi
if test "${enable_ml}" = "yes" -a "${have_cxx11}" = "no"; then
AC_MSG_ERROR([You have explicitly requested --enable-ml functionality but it cannot be built without a C++11 toolchain.])
else
CXX11FLAG="$CXX11FLAG -std=c++11"
fi
# Decide if we should build ML
@ -1360,7 +1405,7 @@ if test "${enable_exporting_kinesis}" != "no" -a "${have_libaws_cpp_sdk_kinesis}
enable_exporting_kinesis="yes"
AC_DEFINE([HAVE_KINESIS], [1], [libaws-cpp-sdk-kinesis usability])
OPTIONAL_KINESIS_CFLAGS="${LIBCRYPTO_CFLAGS} ${LIBSSL_CFLAGS} ${LIBCURL_CFLAGS}"
CXX11FLAG="${AWS_CPP_SDK_KINESIS_CFLAGS} ${AWS_CPP_SDK_CORE_CFLAGS}"
OPTIONAL_KINESIS_CXXFLAGS="${AWS_CPP_SDK_KINESIS_CFLAGS} ${AWS_CPP_SDK_CORE_CFLAGS}"
OPTIONAL_KINESIS_LIBS="${AWS_CPP_SDK_KINESIS_LIBS} ${AWS_CPP_SDK_CORE_LIBS} \
${LIBCRYPTO_LIBS} ${LIBSSL_LIBS} ${LIBCURL_LIBS}"
else
@ -1411,7 +1456,6 @@ if test "${enable_exporting_pubsub}" != "no" -a "${have_pubsub_protos}" = "yes"
enable_exporting_pubsub="yes"
AC_DEFINE([ENABLE_EXPORTING_PUBSUB], [1], [Pub/Sub API usability])
OPTIONAL_PUBSUB_CFLAGS="${GRPC_CFLAGS} ${PUBSUB_CFLAGS}"
CXX11FLAG="-std=c++11"
OPTIONAL_PUBSUB_LIBS="${GRPC_LIBS} ${PUBSUB_LIBS}"
else
enable_pubsub="no"
@ -1431,7 +1475,7 @@ AC_MSG_CHECKING([for snappy::RawCompress in -lsnappy])
save_LIBS="${LIBS}"
LIBS="-lsnappy"
save_CXXFLAGS="${CXXFLAGS}"
CXXFLAGS="${CXXFLAGS} -std=c++11"
CXXFLAGS="${CXXFLAGS} ${CPP_STD_FLAG}"
AC_TRY_LINK(
[
@ -1477,7 +1521,6 @@ if test "${enable_exporting_prometheus_remote_write}" != "no" -a "${have_libprot
enable_exporting_prometheus_remote_write="yes"
AC_DEFINE([ENABLE_PROMETHEUS_REMOTE_WRITE], [1], [Prometheus remote write API usability])
OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS="${SNAPPY_CFLAGS} -I \$(abs_top_srcdir)/exporting/prometheus/remote_write"
CXX11FLAG="-std=c++11"
OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS="${SNAPPY_LIBS}"
OPTIONAL_PROTOBUF_CFLAGS="${PROTOBUF_CFLAGS}"
OPTIONAL_PROTOBUF_LIBS="${PROTOBUF_LIBS}"
@ -1648,7 +1691,7 @@ CFLAGS="${originalCFLAGS} ${OPTIONAL_LTO_CFLAGS} ${OPTIONAL_PROTOBUF_CFLAGS} ${O
${OPTIONAL_MONGOC_CFLAGS} ${LWS_CFLAGS} ${OPTIONAL_JSONC_STATIC_CFLAGS} ${OPTIONAL_YAML_STATIC_CFLAGS} ${OPTIONAL_BPF_CFLAGS} ${JUDY_CFLAGS} \
${OPTIONAL_ACLK_CFLAGS} ${OPTIONAL_ML_CFLAGS} ${OPTIONAL_OS_DEP_CFLAGS}"
CXXFLAGS="${CFLAGS} ${CXX11FLAG}"
CXXFLAGS="${CFLAGS} ${OPTIONAL_KINESIS_CXXFLAGS} ${CPP_STD_FLAG}"
CPPFLAGS="\
-DVARLIB_DIR=\"\\\"${varlibdir}\\\"\" \
@ -1663,6 +1706,7 @@ CPPFLAGS="\
AC_SUBST([OPTIONAL_MATH_CFLAGS])
AC_SUBST([OPTIONAL_MATH_LIBS])
AC_SUBST([OPTIONAL_DATACHANNEL_LIBS])
AC_SUBST([OPTIONAL_UV_LIBS])
AC_SUBST([OPTIONAL_LZ4_LIBS])
AC_SUBST([OPTIONAL_SSL_LIBS])
@ -1842,6 +1886,7 @@ AC_CONFIG_FILES([
web/api/health/Makefile
web/gui/Makefile
web/gui/dashboard/Makefile
web/rtc/Makefile
web/server/Makefile
web/server/static/Makefile
claim/Makefile

View file

@ -59,6 +59,7 @@
// the netdata API
#include "web/server/web_client.h"
#include "web/rtc/webrtc.h"
// all data collection plugins
#include "collectors/all.h"

View file

@ -309,6 +309,8 @@ static bool service_wait_exit(SERVICE_TYPE service, usec_t timeout_ut) {
timeout = false; \
}
void web_client_cache_destroy(void);
void netdata_cleanup_and_exit(int ret) {
usec_t started_ut = now_monotonic_usec();
usec_t last_ut = started_ut;
@ -336,6 +338,10 @@ void netdata_cleanup_and_exit(int ret) {
}
#endif
delta_shutdown_time("close webrtc connections");
webrtc_close_all_connections();
delta_shutdown_time("disable ML detection and training threads");
ml_stop_threads();
@ -380,6 +386,10 @@ void netdata_cleanup_and_exit(int ret) {
SERVICE_MAINTENANCE
, 3 * USEC_PER_SEC);
delta_shutdown_time("clear web client cache");
web_client_cache_destroy();
delta_shutdown_time("clean rrdhost database");
rrdhost_cleanup_all();
@ -2136,6 +2146,11 @@ int main(int argc, char **argv) {
}
#endif
// ------------------------------------------------------------------------
// initialize WebRTC
webrtc_initialize();
// ------------------------------------------------------------------------
// unblock signals

View file

@ -1295,6 +1295,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt
RRDHOST *rrdhost_find_by_hostname(const char *hostname);
RRDHOST *rrdhost_find_by_guid(const char *guid);
RRDHOST *find_host_by_node_id(char *node_id);
RRDHOST *rrdhost_find_or_create(
const char *hostname

View file

@ -41,6 +41,22 @@ bool is_storage_engine_shared(STORAGE_INSTANCE *engine __maybe_unused) {
return false;
}
RRDHOST *find_host_by_node_id(char *node_id) {
uuid_t node_uuid;
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return NULL;
RRDHOST *host, *ret = NULL;
dfe_start_read(rrdhost_root_index, host) {
if (host->node_id && !(uuid_memcmp(host->node_id, &node_uuid))) {
ret = host;
break;
}
}
dfe_done(host);
return ret;
}
// ----------------------------------------------------------------------------
// RRDHOST indexes management

View file

@ -82,26 +82,6 @@ struct aclk_sync_host_config {
uint64_t alerts_ack_sequence_id; // last sequence_id ack'ed from cloud via sendsnapshot message
};
static inline RRDHOST *find_host_by_node_id(char *node_id)
{
uuid_t node_uuid;
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return NULL;
rrd_rdlock();
RRDHOST *host, *ret = NULL;
rrdhost_foreach_read(host) {
if (host->node_id && !(uuid_memcmp(host->node_id, &node_uuid))) {
ret = host;
break;
}
}
rrd_unlock();
return ret;
}
extern sqlite3 *db_meta;
int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd);

View file

@ -309,7 +309,7 @@ void health_aggregate_alarms(RRDHOST *host, BUFFER *wb, BUFFER* contexts, RRDCAL
if (contexts) {
p = (char*)buffer_tostring(contexts);
while(p && *p && (tok = mystrsep(&p, ", |"))) {
while(p && *p && (tok = strsep_skip_consecutive_separators(&p, ", |"))) {
if(!*tok) continue;
STRING *tok_string = string_strdupz(tok);

View file

@ -201,7 +201,7 @@ fi
[ -z "${NETDATA_STOCK_CONFIG_DIR}" ] && NETDATA_STOCK_CONFIG_DIR="@libconfigdir_POST@"
[ -z "${NETDATA_CACHE_DIR}" ] && NETDATA_CACHE_DIR="@cachedir_POST@"
[ -z "${NETDATA_REGISTRY_URL}" ] && NETDATA_REGISTRY_URL="https://registry.my-netdata.io"
[ -z "${NETDATA_REGISTRY_CLOUD_BASE_URL}" ] && NETDATA_REGISTRY_CLOUD_BASE_URL="https://api.netdata.cloud"
[ -z "${NETDATA_REGISTRY_CLOUD_BASE_URL}" ] && NETDATA_REGISTRY_CLOUD_BASE_URL="https://app.netdata.cloud"
# -----------------------------------------------------------------------------
# parse command line parameters

View file

@ -201,6 +201,28 @@ static inline void buffer_strcat(BUFFER *wb, const char *txt) {
buffer_overflow_check(wb);
}
static inline void buffer_strncat(BUFFER *wb, const char *txt, size_t len) {
if(unlikely(!txt || !*txt)) return;
const char *t = txt;
while(*t) {
buffer_need_bytes(wb, len);
char *s = &wb->buffer[wb->len];
char *d = s;
const char *e = &wb->buffer[wb->len + len];
while(*t && d < e)
*d++ = *t++;
wb->len += d - s;
}
buffer_need_bytes(wb, 1);
wb->buffer[wb->len] = '\0';
buffer_overflow_check(wb);
}
static inline void buffer_json_strcat(BUFFER *wb, const char *txt) {
if(unlikely(!txt || !*txt)) return;

View file

@ -845,13 +845,14 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed)
else if(!strcmp(co->name, CONFIG_SECTION_ML)) pri = 8;
else if(!strcmp(co->name, CONFIG_SECTION_HEALTH)) pri = 9;
else if(!strcmp(co->name, CONFIG_SECTION_WEB)) pri = 10;
// by default, new sections will get pri = 11 (set at the end, below)
else if(!strcmp(co->name, CONFIG_SECTION_REGISTRY)) pri = 12;
else if(!strcmp(co->name, CONFIG_SECTION_GLOBAL_STATISTICS)) pri = 13;
else if(!strcmp(co->name, CONFIG_SECTION_PLUGINS)) pri = 14;
else if(!strcmp(co->name, CONFIG_SECTION_STATSD)) pri = 15;
else if(!strncmp(co->name, "plugin:", 7)) pri = 16; // << change the loop too if you change this
else pri = 11; // this is used for any new (currently unknown) sections
else if(!strcmp(co->name, CONFIG_SECTION_WEBRTC)) pri = 11;
// by default, new sections will get pri = 12 (set at the end, below)
else if(!strcmp(co->name, CONFIG_SECTION_REGISTRY)) pri = 13;
else if(!strcmp(co->name, CONFIG_SECTION_GLOBAL_STATISTICS)) pri = 14;
else if(!strcmp(co->name, CONFIG_SECTION_PLUGINS)) pri = 15;
else if(!strcmp(co->name, CONFIG_SECTION_STATSD)) pri = 16;
else if(!strncmp(co->name, "plugin:", 7)) pri = 17; // << change the loop too if you change this
else pri = 12; // this is used for any new (currently unknown) sections
if(i == pri) {
int loaded = 0;

View file

@ -88,6 +88,7 @@
#define CONFIG_SECTION_ENV_VARS "environment variables"
#define CONFIG_SECTION_SQLITE "sqlite"
#define CONFIG_SECTION_WEB "web"
#define CONFIG_SECTION_WEBRTC "webrtc"
#define CONFIG_SECTION_STATSD "statsd"
#define CONFIG_SECTION_PLUGINS "plugins"
#define CONFIG_SECTION_CLOUD "cloud"

View file

@ -514,4 +514,62 @@ static inline int uuid_memcmp(const uuid_t *uu1, const uuid_t *uu2) {
return memcmp(uu1, uu2, sizeof(uuid_t));
}
static inline char *strsep_skip_consecutive_separators(char **ptr, char *s) {
char *p = (char *)"";
while (p && !p[0] && *ptr) p = strsep(ptr, s);
return (p);
}
// remove leading and trailing spaces; may return NULL
static inline char *trim(char *s) {
// skip leading spaces
while (*s && isspace(*s)) s++;
if (!*s) return NULL;
// skip tailing spaces
// this way is way faster. Writes only one NUL char.
ssize_t l = (ssize_t)strlen(s);
if (--l >= 0) {
char *p = s + l;
while (p > s && isspace(*p)) p--;
*++p = '\0';
}
if (!*s) return NULL;
return s;
}
// like trim(), but also remove duplicate spaces inside the string; may return NULL
static inline char *trim_all(char *buffer) {
char *d = buffer, *s = buffer;
// skip spaces
while(isspace(*s)) s++;
while(*s) {
// copy the non-space part
while(*s && !isspace(*s)) *d++ = *s++;
// add a space if we have to
if(*s && isspace(*s)) {
*d++ = ' ';
s++;
}
// skip spaces
while(isspace(*s)) s++;
}
*d = '\0';
if(d > buffer) {
d--;
if(isspace(*d)) *d = '\0';
}
if(!buffer[0]) return NULL;
return buffer;
}
#endif //NETDATA_INLINED_H

View file

@ -1042,171 +1042,6 @@ void netdata_fix_chart_id(char *s) {
while ((*s = netdata_map_chart_ids[(unsigned char) *s])) s++;
}
/*
// http://stackoverflow.com/questions/7666509/hash-function-for-string
uint32_t simple_hash(const char *name)
{
const char *s = name;
uint32_t hash = 5381;
int i;
while((i = *s++)) hash = ((hash << 5) + hash) + i;
// fprintf(stderr, "HASH: %lu %s\n", hash, name);
return hash;
}
*/
/*
// http://isthe.com/chongo/tech/comp/fnv/#FNV-1a
uint32_t simple_hash(const char *name) {
unsigned char *s = (unsigned char *) name;
uint32_t hval = 0x811c9dc5;
// FNV-1a algorithm
while (*s) {
// multiply by the 32 bit FNV magic prime mod 2^32
// NOTE: No need to optimize with left shifts.
// GCC will use imul instruction anyway.
// Tested with 'gcc -O3 -S'
//hval += (hval<<1) + (hval<<4) + (hval<<7) + (hval<<8) + (hval<<24);
hval *= 16777619;
// xor the bottom with the current octet
hval ^= (uint32_t) *s++;
}
// fprintf(stderr, "HASH: %u = %s\n", hval, name);
return hval;
}
uint32_t simple_uhash(const char *name) {
unsigned char *s = (unsigned char *) name;
uint32_t hval = 0x811c9dc5, c;
// FNV-1a algorithm
while ((c = *s++)) {
if (unlikely(c >= 'A' && c <= 'Z')) c += 'a' - 'A';
hval *= 16777619;
hval ^= c;
}
return hval;
}
*/
/*
// http://eternallyconfuzzled.com/tuts/algorithms/jsw_tut_hashing.aspx
// one at a time hash
uint32_t simple_hash(const char *name) {
unsigned char *s = (unsigned char *)name;
uint32_t h = 0;
while(*s) {
h += *s++;
h += (h << 10);
h ^= (h >> 6);
}
h += (h << 3);
h ^= (h >> 11);
h += (h << 15);
// fprintf(stderr, "HASH: %u = %s\n", h, name);
return h;
}
*/
void strreverse(char *begin, char *end) {
while (end > begin) {
// clearer code.
char aux = *end;
*end-- = *begin;
*begin++ = aux;
}
}
char *strsep_on_1char(char **ptr, char c) {
if(unlikely(!ptr || !*ptr))
return NULL;
// remember the position we started
char *s = *ptr;
// skip separators in front
while(*s == c) s++;
char *ret = s;
// find the next separator
while(*s++) {
if(unlikely(*s == c)) {
*s++ = '\0';
*ptr = s;
return ret;
}
}
*ptr = NULL;
return ret;
}
char *mystrsep(char **ptr, char *s) {
char *p = "";
while (p && !p[0] && *ptr) p = strsep(ptr, s);
return (p);
}
char *trim(char *s) {
// skip leading spaces
while (*s && isspace(*s)) s++;
if (!*s) return NULL;
// skip tailing spaces
// this way is way faster. Writes only one NUL char.
ssize_t l = strlen(s);
if (--l >= 0) {
char *p = s + l;
while (p > s && isspace(*p)) p--;
*++p = '\0';
}
if (!*s) return NULL;
return s;
}
inline char *trim_all(char *buffer) {
char *d = buffer, *s = buffer;
// skip spaces
while(isspace(*s)) s++;
while(*s) {
// copy the non-space part
while(*s && !isspace(*s)) *d++ = *s++;
// add a space if we have to
if(*s && isspace(*s)) {
*d++ = ' ';
s++;
}
// skip spaces
while(isspace(*s)) s++;
}
*d = '\0';
if(d > buffer) {
d--;
if(isspace(*d)) *d = '\0';
}
if(!buffer[0]) return NULL;
return buffer;
}
static int memory_file_open(const char *filename, size_t size) {
// info("memory_file_open('%s', %zu", filename, size);

View file

@ -491,11 +491,6 @@ typedef struct storage_point {
void netdata_fix_chart_id(char *s);
void netdata_fix_chart_name(char *s);
void strreverse(char* begin, char* end);
char *mystrsep(char **ptr, char *s);
char *trim(char *s); // remove leading and trailing spaces; may return NULL
char *trim_all(char *buffer); // like trim(), but also remove duplicate spaces inside the string; may return NULL
int madvise_sequential(void *mem, size_t len);
int madvise_random(void *mem, size_t len);
int madvise_dontfork(void *mem, size_t len);
@ -673,9 +668,8 @@ extern char *netdata_configured_host_prefix;
#include "parser/parser.h"
#include "yaml.h"
// BEWARE: Outside of the C code this also exists in alarm-notify.sh
#define DEFAULT_CLOUD_BASE_URL "https://api.netdata.cloud"
#define DEFAULT_CLOUD_UI_URL "https://app.netdata.cloud"
// BEWARE: this exists in alarm-notify.sh
#define DEFAULT_CLOUD_BASE_URL "https://app.netdata.cloud"
#define RRD_STORAGE_TIERS 5

View file

@ -1149,8 +1149,8 @@ int accept4(int sock, struct sockaddr *addr, socklen_t *addrlen, int flags) {
* update the client_host if uninitialized - ensure the hostsize is the number
* of *writable* bytes (i.e. be aware of the strdup used to compact the pollinfo).
*/
extern int connection_allowed(int fd, char *client_ip, char *client_host, size_t hostsize, SIMPLE_PATTERN *access_list,
const char *patname, int allow_dns)
int connection_allowed(int fd, char *client_ip, char *client_host, size_t hostsize, SIMPLE_PATTERN *access_list,
const char *patname, int allow_dns)
{
debug(D_LISTENER,"checking %s... (allow_dns=%d)", patname, allow_dns);
if (!access_list)

View file

@ -22,8 +22,11 @@ typedef enum web_client_acl {
WEB_CLIENT_ACL_SSL_FORCE = (1 << 7),
WEB_CLIENT_ACL_SSL_DEFAULT = (1 << 8),
WEB_CLIENT_ACL_ACLK = (1 << 9),
WEB_CLIENT_ACL_WEBRTC = (1 << 10),
} WEB_CLIENT_ACL;
#define WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC (WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK | WEB_CLIENT_ACL_WEBRTC)
#define WEB_CLIENT_ACL_ALL 0xFFFF
#define web_client_can_access_dashboard(w) ((w)->acl & WEB_CLIENT_ACL_DASHBOARD)

View file

@ -21,8 +21,63 @@ inline int netdata_thread_tag_exists(void) {
return (netdata_thread && netdata_thread->tag && *netdata_thread->tag);
}
static const char *thread_name_get(bool recheck) {
static __thread char threadname[NETDATA_THREAD_NAME_MAX + 1] = "";
if(netdata_thread_tag_exists())
strncpyz(threadname, netdata_thread->tag, NETDATA_THREAD_NAME_MAX + 1);
else {
if(!recheck && threadname[0])
return threadname;
#if defined(__FreeBSD__)
pthread_get_name_np(pthread_self(), threadname, NETDATA_THREAD_NAME_MAX + 1);
if(strcmp(threadname, "netdata") == 0)
strncpyz(threadname, "MAIN", NETDATA_THREAD_NAME_MAX + 1);
#elif defined(__APPLE__)
strncpyz(threadname, "MAIN", NETDATA_THREAD_NAME_MAX + 1);
#elif defined(HAVE_PTHREAD_GETNAME_NP)
pthread_getname_np(pthread_self(), threadname, NETDATA_THREAD_NAME_MAX + 1);
if(strcmp(threadname, "netdata") == 0)
strncpyz(threadname, "MAIN", NETDATA_THREAD_NAME_MAX + 1);
#else
strncpyz(threadname, "MAIN", NETDATA_THREAD_NAME_MAX + 1);
#endif
}
return threadname;
}
const char *netdata_thread_tag(void) {
return (netdata_thread_tag_exists() ? netdata_thread->tag : "MAIN");
return thread_name_get(false);
}
static size_t webrtc_id = 0;
static __thread bool webrtc_name_set = false;
void webrtc_set_thread_name(void) {
if(!netdata_thread && !webrtc_name_set) {
webrtc_name_set = true;
char threadname[NETDATA_THREAD_NAME_MAX + 1];
#if defined(__FreeBSD__)
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED));
pthread_set_name_np(pthread_self(), threadname);
#elif defined(__APPLE__)
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED));
pthread_setname_np(threadname);
#elif defined(HAVE_PTHREAD_GETNAME_NP)
pthread_getname_np(pthread_self(), threadname, NETDATA_THREAD_NAME_MAX+1);
if(strcmp(threadname, "netdata") == 0) {
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED));
pthread_setname_np(pthread_self(), threadname);
}
#else
snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED));
pthread_setname_np(pthread_self(), threadname);
#endif
thread_name_get(true);
}
}
// ----------------------------------------------------------------------------
@ -173,6 +228,8 @@ void uv_thread_set_name_np(uv_thread_t ut, const char* name) {
ret = pthread_setname_np(ut, threadname);
#endif
thread_name_get(true);
if (ret)
info("cannot set libuv thread name to %s. Err: %d", threadname, ret);
}

View file

@ -43,6 +43,8 @@ int netdata_thread_detach(pthread_t thread);
void uv_thread_set_name_np(uv_thread_t ut, const char* name);
void os_thread_get_current_name_np(char threadname[NETDATA_THREAD_NAME_MAX + 1]);
void webrtc_set_thread_name(void);
#define netdata_thread_self pthread_self
#define netdata_thread_testcancel pthread_testcancel

View file

@ -17,7 +17,7 @@ char to_hex(char code) {
return hex[code & 15];
}
/* Returns a url-encoded version of str */
/* Returns an url-encoded version of str */
/* IMPORTANT: be sure to free() the returned string after use */
char *url_encode(char *str) {
char *buf, *pbuf;
@ -33,8 +33,8 @@ char *url_encode(char *str) {
else{
*pbuf++ = '%';
*pbuf++ = to_hex(*str >> 4);
*pbuf++ = to_hex(*str & 15);
*pbuf++ = to_hex((char)(*str >> 4));
*pbuf++ = to_hex((char)(*str & 15));
}
str++;
@ -55,9 +55,9 @@ char *url_encode(char *str) {
*
* @return The character decoded on success and 0 otherwise
*/
char url_percent_escape_decode(char *s) {
char url_percent_escape_decode(const char *s) {
if(likely(s[1] && s[2]))
return from_hex(s[1]) << 4 | from_hex(s[2]);
return (char)(from_hex(s[1]) << 4 | from_hex(s[2]));
return 0;
}
@ -98,7 +98,7 @@ char url_utf8_get_byte_length(char c) {
*
* @return count of bytes written to *d
*/
char url_decode_multibyte_utf8(char *s, char *d, char *d_end) {
char url_decode_multibyte_utf8(const char *s, char *d, const char *d_end) {
char first_byte = url_percent_escape_decode(s);
if(unlikely(!first_byte || !IS_UTF8_STARTBYTE(first_byte)))
@ -189,9 +189,9 @@ unsigned char *utf8_check(unsigned char *s)
return NULL;
}
char *url_decode_r(char *to, char *url, size_t size) {
char *s = url, // source
*d = to, // destination
char *url_decode_r(char *to, const char *url, size_t size) {
const char *s = url; // source
char *d = to, // destination
*e = &to[size - 1]; // destination end
while(*s && d < e) {
@ -236,31 +236,45 @@ fail_cleanup:
return NULL;
}
/**
* Is request complete?
*
* Check whether the request is complete.
* This function cannot check all the requests METHODS, for example, case you are working with POST, it will fail.
*
* @param begin is the first character of the sequence to analyse.
* @param end is the last character of the sequence
* @param length is the length of the total of bytes read, it is not the difference between end and begin.
*
* @return It returns 1 when the request is complete and 0 otherwise.
*/
inline int url_is_request_complete(char *begin, char *end, size_t length) {
inline bool url_is_request_complete(char *begin, char *end, size_t length, char **post_payload, size_t *post_payload_size) {
if (begin == end || length < 4)
return false;
if ( begin == end) {
//Message cannot be complete when first and last address are the same
return 0;
if(likely(strncmp(begin, "GET ", 4)) == 0) {
return strstr(end - 4, "\r\n\r\n");
}
else if(unlikely(strncmp(begin, "POST ", 5) == 0)) {
char *cl = strstr(begin, "Content-Length: ");
if(!cl) return false;
cl = &cl[16];
//This math to verify the last is valid, because we are discarding the POST
if (length > 4) {
begin = end - 4;
size_t content_length = str2ul(cl);
char *payload = strstr(cl, "\r\n\r\n");
if(!payload) return false;
payload += 4;
size_t payload_length = length - (payload - begin);
if(payload_length == content_length) {
if(post_payload && post_payload_size) {
if (*post_payload)
freez(*post_payload);
*post_payload = mallocz(payload_length + 1);
memcpy(*post_payload, payload, payload_length);
(*post_payload)[payload_length] = '\0';
*post_payload_size = payload_length;
}
return true;
}
return false;
}
else {
return strstr(end - 4, "\r\n\r\n");
}
return (strstr(begin, "\r\n\r\n"))?1:0;
}
/**
@ -283,109 +297,3 @@ inline char *url_find_protocol(char *s) {
return s;
}
/**
* Map query string
*
* Map the query string fields that will be decoded.
* This functions must be called after to check the presence of query strings,
* here we are assuming that you already tested this.
*
* @param out the pointer to pointers that will be used to map
* @param url the input url that we are decoding.
*
* @return It returns the number of total variables in the query string.
*/
int url_map_query_string(char **out, char *url) {
(void)out;
(void)url;
int count = 0;
//First we try to parse considering that there was not URL encode process
char *moveme = url;
char *ptr;
//We always we have at least one here, so I can set this.
out[count++] = moveme;
while(moveme) {
ptr = strchr((moveme+1), '&');
if(ptr) {
out[count++] = ptr;
}
moveme = ptr;
}
//I could not find any '&', so I am assuming now it is like '%26'
if (count == 1) {
moveme = url;
while(moveme) {
ptr = strchr((moveme+1), '%');
if(ptr) {
char *test = (ptr+1);
if (!strncmp(test, "3f", 2) || !strncmp(test, "3F", 2)) {
out[count++] = ptr;
}
}
moveme = ptr;
}
}
return count;
}
/**
* Parse query string
*
* Parse the query string mapped and store it inside output.
*
* @param output is a vector where I will store the string.
* @param max is the maximum length of the output
* @param map the map done by the function url_map_query_string.
* @param total the total number of variables inside map
*
* @return It returns 0 on success and -1 otherwise
*/
int url_parse_query_string(char *output, size_t max, char **map, int total) {
if(!total) {
return 0;
}
int counter, next;
size_t length;
char *end;
char *begin = map[0];
char save;
size_t copied = 0;
for(counter = 0, next=1 ; next <= total ; ++counter, ++next) {
if (next != total) {
end = map[next];
length = (size_t) (end - begin);
save = *end;
*end = 0x00;
} else {
length = strlen(begin);
end = NULL;
}
length++;
if (length > (max - copied)) {
error("Parsing query string: we cannot parse a query string so big");
break;
}
if(!url_decode_r(output, begin, length)) {
return -1;
}
length = strlen(output);
copied += length;
output += length;
begin = end;
if (begin) {
*begin = save;
}
}
return 0;
}

View file

@ -23,13 +23,9 @@ char *url_encode(char *str);
/* IMPORTANT: be sure to free() the returned string after use */
char *url_decode(char *str);
char *url_decode_r(char *to, char *url, size_t size);
char *url_decode_r(char *to, const char *url, size_t size);
#define WEB_FIELDS_MAX 400
int url_map_query_string(char **out, char *url);
int url_parse_query_string(char *output, size_t max, char **map, int total);
int url_is_request_complete(char *begin,char *end,size_t length);
bool url_is_request_complete(char *begin, char *end, size_t length, char **post_payload, size_t *post_payload_length);
char *url_find_protocol(char *s);
#endif /* NETDATA_URL_H */

View file

@ -4,6 +4,7 @@
#include "registry_internals.h"
#define REGISTRY_STATUS_OK "ok"
#define REGISTRY_STATUS_REDIRECT "redirect"
#define REGISTRY_STATUS_FAILED "failed"
#define REGISTRY_STATUS_DISABLED "disabled"
@ -18,35 +19,26 @@ static inline void registry_unlock(void) {
netdata_mutex_unlock(&registry.lock);
}
// ----------------------------------------------------------------------------
// COOKIES
static void registry_set_cookie(struct web_client *w, const char *guid) {
char edate[100], domain[512];
char edate[100];
time_t et = now_realtime_sec() + registry.persons_expiration;
struct tm etmbuf, *etm = gmtime_r(&et, &etmbuf);
strftime(edate, sizeof(edate), "%a, %d %b %Y %H:%M:%S %Z", etm);
snprintfz(w->cookie1, NETDATA_WEB_REQUEST_COOKIE_SIZE, NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s", guid, edate);
buffer_sprintf(w->response.header, "Set-Cookie: " NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s\r\n", guid, edate);
if(registry.enable_cookies_samesite_secure)
buffer_sprintf(w->response.header, "Set-Cookie: " NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s; SameSite=None; Secure\r\n", guid, edate);
if(registry.registry_domain && registry.registry_domain[0])
snprintfz(domain, 511, "Domain=%s", registry.registry_domain);
else
domain[0]='\0';
int length = snprintfz(w->cookie2, NETDATA_WEB_REQUEST_COOKIE_SIZE,
NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s; %s",
guid, edate, domain);
size_t remaining_length = NETDATA_WEB_REQUEST_COOKIE_SIZE - length;
// 25 is the necessary length to add new cookies
if (registry.enable_cookies_samesite_secure) {
if (length > 0 && remaining_length > 25)
snprintfz(&w->cookie2[length], remaining_length, "; SameSite=None; Secure");
else
error("Netdata does not have enough space to store cookies SameSite and Secure");
if(registry.registry_domain && *registry.registry_domain) {
buffer_sprintf(w->response.header, "Set-Cookie: " NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s; Domain=%s\r\n", guid, edate, registry.registry_domain);
if(registry.enable_cookies_samesite_secure)
buffer_sprintf(w->response.header, "Set-Cookie: " NETDATA_REGISTRY_COOKIE_NAME "=%s; Expires=%s; Domain=%s; SameSite=None; Secure\r\n", guid, edate, registry.registry_domain);
}
w->response.has_cookies = true;
}
static inline void registry_set_person_cookie(struct web_client *w, REGISTRY_PERSON *p) {
@ -60,22 +52,24 @@ static inline void registry_set_person_cookie(struct web_client *w, REGISTRY_PER
static inline void registry_json_header(RRDHOST *host, struct web_client *w, const char *action, const char *status) {
buffer_flush(w->response.data);
w->response.data->content_type = CT_APPLICATION_JSON;
buffer_sprintf(w->response.data, "{\n\t\"action\": \"%s\",\n\t\"status\": \"%s\",\n\t\"hostname\": \"%s\",\n\t\"machine_guid\": \"%s\"",
action, status, rrdhost_registry_hostname(host), host->machine_guid);
buffer_json_initialize(w->response.data, "\"", "\"", 0, true, false);
buffer_json_member_add_string(w->response.data, "action", action);
buffer_json_member_add_string(w->response.data, "status", status);
buffer_json_member_add_string(w->response.data, "hostname", rrdhost_registry_hostname(host));
buffer_json_member_add_string(w->response.data, "machine_guid", host->machine_guid);
}
static inline void registry_json_footer(struct web_client *w) {
buffer_strcat(w->response.data, "\n}\n");
buffer_json_finalize(w->response.data);
}
static inline int registry_json_disabled(RRDHOST *host, struct web_client *w, const char *action) {
registry_json_header(host, w, action, REGISTRY_STATUS_DISABLED);
buffer_sprintf(w->response.data, ",\n\t\"registry\": \"%s\"",
registry.registry_to_announce);
buffer_json_member_add_string(w->response.data, "registry", registry.registry_to_announce);
registry_json_footer(w);
return 200;
return HTTP_RESP_OK;
}
@ -97,14 +91,16 @@ static int registry_json_person_url_callback(void *entry, void *data) {
struct web_client *w = c->w;
if (!strcmp(pu->url->url,"***")) return 0;
if(unlikely(c->count++))
buffer_strcat(w->response.data, ",");
buffer_sprintf(w->response.data, "\n\t\t[ \"%s\", \"%s\", %u000, %u, \"%s\" ]",
pu->machine->guid, pu->url->url, pu->last_t, pu->usages, pu->machine_name);
buffer_json_add_array_item_array(w->response.data);
buffer_json_add_array_item_string(w->response.data, pu->machine->guid);
buffer_json_add_array_item_string(w->response.data, pu->url->url);
buffer_json_add_array_item_uint64(w->response.data, pu->last_t * 1000);
buffer_json_add_array_item_uint64(w->response.data, pu->usages);
buffer_json_add_array_item_string(w->response.data, pu->machine_name);
buffer_json_array_close(w->response.data);
return 0;
return 1;
}
// callback for rendering MACHINE_URLs
@ -114,13 +110,14 @@ static int registry_json_machine_url_callback(const DICTIONARY_ITEM *item __mayb
struct web_client *w = c->w;
REGISTRY_MACHINE *m = c->m;
if (!strcmp(mu->url->url,"***")) return 1;
if (!strcmp(mu->url->url,"***")) return 0;
if(unlikely(c->count++))
buffer_strcat(w->response.data, ",");
buffer_sprintf(w->response.data, "\n\t\t[ \"%s\", \"%s\", %u000, %u ]",
m->guid, mu->url->url, mu->last_t, mu->usages);
buffer_json_add_array_item_array(w->response.data);
buffer_json_add_array_item_string(w->response.data, m->guid);
buffer_json_add_array_item_string(w->response.data, mu->url->url);
buffer_json_add_array_item_uint64(w->response.data, mu->last_t * 1000);
buffer_json_add_array_item_uint64(w->response.data, mu->usages);
buffer_json_array_close(w->response.data);
return 1;
}
@ -149,30 +146,23 @@ static inline int registry_person_url_callback_verify_machine_exists(void *entry
// The registry does not seem to be designed to support this and I cannot see any concurrency protection
// that could make this safe, so try to be as atomic as possible.
void registry_update_cloud_base_url()
{
// This is guaranteed to be set early in main via post_conf_load()
registry.cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (registry.cloud_base_url == NULL)
fatal("Do not move the cloud base url out of post_conf_load!!");
void registry_update_cloud_base_url() {
registry.cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", DEFAULT_CLOUD_BASE_URL);
setenv("NETDATA_REGISTRY_CLOUD_BASE_URL", registry.cloud_base_url, 1);
}
// ----------------------------------------------------------------------------
// public HELLO request
int registry_request_hello_json(RRDHOST *host, struct web_client *w) {
registry_json_header(host, w, "hello", REGISTRY_STATUS_OK);
const char *cloud_ui_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud ui url", DEFAULT_CLOUD_UI_URL);
buffer_sprintf(w->response.data,
",\n\t\"registry\": \"%s\",\n\t\"cloud_base_url\": \"%s\",\n\t\"anonymous_statistics\": %s",
registry.registry_to_announce,
cloud_ui_url, netdata_anonymous_statistics_enabled?"true":"false");
buffer_json_member_add_string(w->response.data, "registry", registry.registry_to_announce);
buffer_json_member_add_string(w->response.data, "cloud_base_url", registry.cloud_base_url);
buffer_json_member_add_boolean(w->response.data, "anonymous_statistics", netdata_anonymous_statistics_enabled);
registry_json_footer(w);
return 200;
return HTTP_RESP_OK;
}
// ----------------------------------------------------------------------------
@ -192,8 +182,10 @@ int registry_request_access_json(RRDHOST *host, struct web_client *w, char *pers
buffer_flush(w->response.data);
registry_set_cookie(w, REGISTRY_VERIFY_COOKIES_GUID);
w->response.data->content_type = CT_APPLICATION_JSON;
buffer_sprintf(w->response.data, "{ \"status\": \"redirect\", \"registry\": \"%s\" }", registry.registry_to_announce);
return 200;
registry_json_header(host, w, "access", REGISTRY_STATUS_REDIRECT);
buffer_json_member_add_string(w->response.data, "registry", registry.registry_to_announce);
registry_json_footer(w);
return HTTP_RESP_OK;
}
if(unlikely(person_guid[0] && !strcmp(person_guid, REGISTRY_VERIFY_COOKIES_GUID)))
@ -208,7 +200,7 @@ int registry_request_access_json(RRDHOST *host, struct web_client *w, char *pers
registry_json_header(host, w, "access", REGISTRY_STATUS_FAILED);
registry_json_footer(w);
registry_unlock();
return 412;
return HTTP_RESP_PRECOND_FAIL;
}
// set the cookie
@ -216,15 +208,16 @@ int registry_request_access_json(RRDHOST *host, struct web_client *w, char *pers
// generate the response
registry_json_header(host, w, "access", REGISTRY_STATUS_OK);
buffer_json_member_add_string(w->response.data, "person_guid", p->guid);
buffer_json_member_add_array(w->response.data, "urls");
buffer_sprintf(w->response.data, ",\n\t\"person_guid\": \"%s\",\n\t\"urls\": [", p->guid);
struct registry_json_walk_person_urls_callback c = { p, NULL, w, 0 };
avl_traverse(&p->person_urls, registry_json_person_url_callback, &c);
buffer_strcat(w->response.data, "\n\t]\n");
buffer_json_array_close(w->response.data); // urls
registry_json_footer(w);
registry_unlock();
return 200;
return HTTP_RESP_OK;
}
// ----------------------------------------------------------------------------
@ -242,14 +235,14 @@ int registry_request_delete_json(RRDHOST *host, struct web_client *w, char *pers
registry_json_header(host, w, "delete", REGISTRY_STATUS_FAILED);
registry_json_footer(w);
registry_unlock();
return 412;
return HTTP_RESP_PRECOND_FAIL;
}
// generate the response
registry_json_header(host, w, "delete", REGISTRY_STATUS_OK);
registry_json_footer(w);
registry_unlock();
return 200;
return HTTP_RESP_OK;
}
// ----------------------------------------------------------------------------
@ -267,19 +260,19 @@ int registry_request_search_json(RRDHOST *host, struct web_client *w, char *pers
registry_json_header(host, w, "search", REGISTRY_STATUS_FAILED);
registry_json_footer(w);
registry_unlock();
return 404;
return HTTP_RESP_NOT_FOUND;
}
registry_json_header(host, w, "search", REGISTRY_STATUS_OK);
buffer_strcat(w->response.data, ",\n\t\"urls\": [");
buffer_json_member_add_array(w->response.data, "urls");
struct registry_json_walk_person_urls_callback c = { NULL, m, w, 0 };
dictionary_walkthrough_read(m->machine_urls, registry_json_machine_url_callback, &c);
buffer_strcat(w->response.data, "\n\t]\n");
buffer_json_array_close(w->response.data);
registry_json_footer(w);
registry_unlock();
return 200;
return HTTP_RESP_OK;
}
// ----------------------------------------------------------------------------
@ -346,11 +339,11 @@ int registry_request_switch_json(RRDHOST *host, struct web_client *w, char *pers
// generate the response
registry_json_header(host, w, "switch", REGISTRY_STATUS_OK);
buffer_sprintf(w->response.data, ",\n\t\"person_guid\": \"%s\"", np->guid);
buffer_json_member_add_string(w->response.data, "person_guid", np->guid);
registry_json_footer(w);
registry_unlock();
return 200;
return HTTP_RESP_OK;
}
// ----------------------------------------------------------------------------
@ -380,7 +373,7 @@ void registry_statistics(void) {
rrddim_add(sts, "sessions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
rrddim_set(sts, "sessions", registry.usages_count);
rrddim_set(sts, "sessions", (collected_number)registry.usages_count);
rrdset_done(sts);
// ------------------------------------------------------------------------
@ -408,11 +401,11 @@ void registry_statistics(void) {
rrddim_add(stc, "machines_urls", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
rrddim_set(stc, "persons", registry.persons_count);
rrddim_set(stc, "machines", registry.machines_count);
rrddim_set(stc, "urls", registry.urls_count);
rrddim_set(stc, "persons_urls", registry.persons_urls_count);
rrddim_set(stc, "machines_urls", registry.machines_urls_count);
rrddim_set(stc, "persons", (collected_number)registry.persons_count);
rrddim_set(stc, "machines", (collected_number)registry.machines_count);
rrddim_set(stc, "urls", (collected_number)registry.urls_count);
rrddim_set(stc, "persons_urls", (collected_number)registry.persons_urls_count);
rrddim_set(stc, "machines_urls", (collected_number)registry.machines_urls_count);
rrdset_done(stc);
// ------------------------------------------------------------------------
@ -440,10 +433,10 @@ void registry_statistics(void) {
rrddim_add(stm, "machines_urls", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
}
rrddim_set(stm, "persons", registry.persons_memory + dictionary_stats_for_registry(registry.persons));
rrddim_set(stm, "machines", registry.machines_memory + dictionary_stats_for_registry(registry.machines));
rrddim_set(stm, "urls", registry.urls_memory);
rrddim_set(stm, "persons_urls", registry.persons_urls_memory);
rrddim_set(stm, "machines_urls", registry.machines_urls_memory);
rrddim_set(stm, "persons", (collected_number)registry.persons_memory + dictionary_stats_for_registry(registry.persons));
rrddim_set(stm, "machines", (collected_number)registry.machines_memory + dictionary_stats_for_registry(registry.machines));
rrddim_set(stm, "urls", (collected_number)registry.urls_memory);
rrddim_set(stm, "persons_urls", (collected_number)registry.persons_urls_memory);
rrddim_set(stm, "machines_urls", (collected_number)registry.machines_urls_memory);
rrdset_done(stm);
}

View file

@ -725,7 +725,7 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
}
void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
@ -757,11 +757,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
// parse the parameters and fill rpt and rpt->system_info
while(url) {
char *value = mystrsep(&url, "&");
while(decoded_query_string) {
char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;

View file

@ -331,7 +331,7 @@ void rrdpush_claimed_id(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);

View file

@ -7,6 +7,7 @@ SUBDIRS = \
api \
gui \
server \
rtc \
$(NULL)
usersslconfigdir=$(configdir)/ssl

View file

@ -898,10 +898,10 @@ int web_client_api_request_v1_badge(RRDHOST *host, struct web_client *w, char *u
RRDSET *st = NULL;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;

View file

@ -39,10 +39,10 @@ inline int web_client_api_request_v1_allmetrics(RRDHOST *host, struct web_client
prometheus_prefix = global_exporting_prefix;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;

View file

@ -226,7 +226,7 @@ static inline void query_target_points_statistics(BUFFER *wb, QUERY_TARGET *qt,
}
if(sp->anomaly_count != 0)
buffer_json_member_add_double(wb, "ars", storage_point_anomaly_rate(*sp));
buffer_json_member_add_uint64(wb, "arc", sp->anomaly_count);
}
else {
NETDATA_DOUBLE avg = (sp->count) ? sp->sum / (NETDATA_DOUBLE)sp->count : 0.0;
@ -804,12 +804,12 @@ static inline void rrdr_dimension_query_points_statistics(BUFFER *wb, const char
}
buffer_json_array_close(wb);
buffer_json_member_add_array(wb, "ars");
buffer_json_member_add_array(wb, "arc");
for(size_t c = 0; c < r->d ; c++) {
if (!rrdr_dimension_should_be_exposed(r->od[c], options))
continue;
buffer_json_add_array_item_uint64(wb, sp[c].anomaly_count * 100 / anomaly_rate_multiplier);
buffer_json_add_array_item_uint64(wb, storage_point_anomaly_rate(sp[c]) / anomaly_rate_multiplier / 100.0 * sp[c].count);
}
buffer_json_array_close(wb);
}
@ -1384,6 +1384,11 @@ void rrdr_json_wrapper_begin2(RRDR *r, BUFFER *wb) {
query_target_summary_labels_v12(wb, qt, "labels", true, &label_key_totals, &label_key_value_totals);
query_target_summary_alerts_v2(wb, qt, "alerts");
}
if(query_target_aggregatable(qt)) {
buffer_json_member_add_object(wb, "globals");
query_target_points_statistics(wb, qt, &qt->query_points);
buffer_json_object_close(wb); // globals
}
buffer_json_object_close(wb); // summary
buffer_json_member_add_object(wb, "totals");

View file

@ -139,10 +139,10 @@ int web_client_api_request_v1_mgmt_health(RRDHOST *host, struct web_client *w, c
ret = HTTP_RESP_FORBIDDEN;
} else {
while (url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value) continue;
char *key = mystrsep(&value, "=");
char *key = strsep_skip_consecutive_separators(&value, "=");
if (!key || !*key) continue;
if (!value || !*value) continue;

View file

@ -798,7 +798,7 @@ RRDR_GROUP_BY group_by_parse(char *s) {
RRDR_GROUP_BY group_by = RRDR_GROUP_BY_NONE;
while(s) {
char *key = mystrsep(&s, ",| ");
char *key = strsep_skip_consecutive_separators(&s, ",| ");
if (!key || !*key) continue;
if (strcmp(key, "selected") == 0)
@ -2720,6 +2720,7 @@ struct rrdr_group_by_entry {
};
static RRDR *rrd2rrdr_group_by_initialize(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
RRDR *r_tmp = NULL;
RRDR_OPTIONS options = qt->window.options;
if(qt->request.version < 2) {
@ -3013,7 +3014,7 @@ static RRDR *rrd2rrdr_group_by_initialize(ONEWAYALLOC *owa, QUERY_TARGET *qt) {
if(!first_r || !last_r)
goto cleanup;
RRDR *r_tmp = rrdr_create(owa, qt, 1, qt->window.points);
r_tmp = rrdr_create(owa, qt, 1, qt->window.points);
if (!r_tmp) {
internal_error(true,
"QUERY: cannot create group by temporary RRDR for %s, after=%ld, before=%ld, dimensions=%d, points=%zu",

View file

@ -2,27 +2,32 @@
#include "web_api.h"
int web_client_api_request_vX(RRDHOST *host, struct web_client *w, char *url, struct web_api_command *api_commands) {
if(unlikely(!url || !*url)) {
int web_client_api_request_vX(RRDHOST *host, struct web_client *w, char *url_path_endpoint, struct web_api_command *api_commands) {
if(unlikely(!url_path_endpoint || !*url_path_endpoint)) {
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Which API command?");
return HTTP_RESP_BAD_REQUEST;
}
uint32_t hash = simple_hash(url);
uint32_t hash = simple_hash(url_path_endpoint);
for(int i = 0; api_commands[i].command ; i++) {
if(unlikely(hash == api_commands[i].hash && !strcmp(url, api_commands[i].command))) {
if(unlikely(hash == api_commands[i].hash && !strcmp(url_path_endpoint, api_commands[i].command))) {
if(unlikely(api_commands[i].acl != WEB_CLIENT_ACL_NOCHECK) && !(w->acl & api_commands[i].acl))
return web_client_permission_denied(w);
return api_commands[i].callback(host, w, (w->decoded_query_string + 1));
char *query_string = (char *)buffer_tostring(w->url_query_string_decoded);
if(*query_string == '?')
query_string = &query_string[1];
return api_commands[i].callback(host, w, query_string);
}
}
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Unsupported API command: ");
buffer_strcat_htmlescape(w->response.data, url);
buffer_strcat_htmlescape(w->response.data, url_path_endpoint);
return HTTP_RESP_NOT_FOUND;
}
@ -30,7 +35,7 @@ RRDCONTEXT_TO_JSON_OPTIONS rrdcontext_to_json_parse_options(char *o) {
RRDCONTEXT_TO_JSON_OPTIONS options = RRDCONTEXT_OPTION_NONE;
char *tok;
while(o && *o && (tok = mystrsep(&o, ", |"))) {
while(o && *o && (tok = strsep_skip_consecutive_separators(&o, ", |"))) {
if(!*tok) continue;
if(!strcmp(tok, "full") || !strcmp(tok, "all"))
@ -78,11 +83,11 @@ int web_client_api_request_weights(RRDHOST *host, struct web_client *w, char *ur
};
while (url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value)
continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if (!name || !*name)
continue;
if (!value || !*value)
@ -203,5 +208,9 @@ int web_client_api_request_weights(RRDHOST *host, struct web_client *w, char *ur
bool web_client_interrupt_callback(void *data) {
struct web_client *w = data;
if(w->interrupt.callback)
return w->interrupt.callback(w, w->interrupt.callback_data);
return sock_has_output_error(w->ofd);
}
}

View file

@ -18,7 +18,7 @@ struct web_api_command {
struct web_client;
int web_client_api_request_vX(RRDHOST *host, struct web_client *w, char *url, struct web_api_command *api_commands);
int web_client_api_request_vX(RRDHOST *host, struct web_client *w, char *url_path_endpoint, struct web_api_command *api_commands);
static inline void fix_google_param(char *s) {
if(unlikely(!s || !*s)) return;

View file

@ -178,7 +178,7 @@ inline RRDR_OPTIONS web_client_api_request_v1_data_options(char *o) {
RRDR_OPTIONS ret = 0x00000000;
char *tok;
while(o && *o && (tok = mystrsep(&o, ", |"))) {
while(o && *o && (tok = strsep_skip_consecutive_separators(&o, ", |"))) {
if(!*tok) continue;
uint32_t hash = simple_hash(tok);
@ -262,7 +262,7 @@ inline uint32_t web_client_api_request_v1_data_google_format(char *name) {
int web_client_api_request_v1_alarms_select (char *url) {
int all = 0;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value) continue;
if(!strcmp(value, "all") || !strcmp(value, "all=true")) all = 1;
@ -300,10 +300,10 @@ inline int web_client_api_request_v1_alarm_count(RRDHOST *host, struct web_clien
buffer_sprintf(w->response.data, "[");
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -341,10 +341,10 @@ inline int web_client_api_request_v1_alarm_log(RRDHOST *host, struct web_client
char *chart = NULL;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -365,10 +365,10 @@ inline int web_client_api_request_single_chart(RRDHOST *host, struct web_client
buffer_flush(w->response.data);
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -419,10 +419,10 @@ static int web_client_api_request_v1_context(RRDHOST *host, struct web_client *w
buffer_flush(w->response.data);
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -483,10 +483,10 @@ static int web_client_api_request_v1_contexts(RRDHOST *host, struct web_client *
buffer_flush(w->response.data);
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -579,10 +579,10 @@ static inline int web_client_api_request_v1_data(RRDHOST *host, struct web_clien
RRDR_OPTIONS options = 0;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -628,10 +628,10 @@ static inline int web_client_api_request_v1_data(RRDHOST *host, struct web_clien
char *tqx_name, *tqx_value;
while(value) {
tqx_value = mystrsep(&value, ";");
tqx_value = strsep_skip_consecutive_separators(&value, ";");
if(!tqx_value || !*tqx_value) continue;
tqx_name = mystrsep(&tqx_value, ":");
tqx_name = strsep_skip_consecutive_separators(&tqx_value, ":");
if(!tqx_name || !*tqx_name) continue;
if(!tqx_value || !*tqx_value) continue;
@ -722,17 +722,10 @@ static inline int web_client_api_request_v1_data(RRDHOST *host, struct web_clien
goto cleanup;
}
if (timeout) {
struct timeval now;
now_realtime_timeval(&now);
int inqueue = (int)dt_usec(&w->tv_in, &now) / 1000;
timeout -= inqueue;
if (timeout <= 0) {
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Query timeout exceeded");
ret = HTTP_RESP_BACKEND_FETCH_FAILED;
goto cleanup;
}
web_client_timeout_checkpoint_set(w, timeout);
if(web_client_timeout_checkpoint_and_check(w, NULL)) {
ret = w->response.code;
goto cleanup;
}
if(outFileName && *outFileName) {
@ -851,10 +844,10 @@ inline int web_client_api_request_v1_registry(RRDHOST *host, struct web_client *
buffer_no_cacheable(w->response.data);
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if (!name || !*name) continue;
if (!value || !*value) continue;
@ -1287,11 +1280,11 @@ int web_client_api_request_v1_function(RRDHOST *host, struct web_client *w, char
const char *function = NULL;
while (url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if (!value || !*value)
continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if (!name || !*name)
continue;
@ -1426,46 +1419,46 @@ int web_client_api_request_v1_dbengine_stats(RRDHOST *host __maybe_unused, struc
#endif
static struct web_api_command api_commands_v1[] = {
{ "info", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_info },
{ "data", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_data },
{ "chart", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_chart },
{ "charts", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_charts },
{ "context", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_context },
{ "contexts", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_contexts },
{ "info", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_info },
{ "data", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_data },
{ "chart", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_chart },
{ "charts", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_charts },
{ "context", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_context },
{ "contexts", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_contexts },
// registry checks the ACL by itself, so we allow everything
{ "registry", 0, WEB_CLIENT_ACL_NOCHECK, web_client_api_request_v1_registry },
// badges can be fetched with both dashboard and badge permissions
{ "badge.svg", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_BADGE | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_badge },
{ "badge.svg", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC | WEB_CLIENT_ACL_BADGE, web_client_api_request_v1_badge },
{ "alarms", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_alarms },
{ "alarms_values", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_alarms_values },
{ "alarm_log", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_alarm_log },
{ "alarm_variables", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_alarm_variables },
{ "alarm_count", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_alarm_count },
{ "allmetrics", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_allmetrics },
{ "alarms", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_alarms },
{ "alarms_values", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_alarms_values },
{ "alarm_log", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_alarm_log },
{ "alarm_variables", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_alarm_variables },
{ "alarm_count", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_alarm_count },
{ "allmetrics", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_allmetrics },
#if defined(ENABLE_ML)
{ "ml_info", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_ml_info },
{ "ml_info", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_ml_info },
{ "ml_models", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_ml_models },
#endif
{ "manage/health", 0, WEB_CLIENT_ACL_MGMT | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_mgmt_health },
{ "aclk", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_aclk_state },
{ "metric_correlations", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_metric_correlations },
{ "weights", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_weights },
{ "aclk", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_aclk_state },
{ "metric_correlations", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_metric_correlations },
{ "weights", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_weights },
{ "function", 0, WEB_CLIENT_ACL_ACLK | ACL_DEV_OPEN_ACCESS, web_client_api_request_v1_function },
{ "functions", 0, WEB_CLIENT_ACL_ACLK | ACL_DEV_OPEN_ACCESS, web_client_api_request_v1_functions },
{ "dbengine_stats", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v1_dbengine_stats },
{ "dbengine_stats", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v1_dbengine_stats },
// terminator
{ NULL, 0, WEB_CLIENT_ACL_NONE, NULL },
};
inline int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *url) {
inline int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *url_path_endpoint) {
static int initialized = 0;
if(unlikely(initialized == 0)) {
@ -1475,5 +1468,5 @@ inline int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *
api_commands_v1[i].hash = simple_hash(api_commands_v1[i].command);
}
return web_client_api_request_vX(host, w, url, api_commands_v1);
return web_client_api_request_vX(host, w, url_path_endpoint, api_commands_v1);
}

View file

@ -24,7 +24,7 @@ int web_client_api_request_v1_charts(RRDHOST *host, struct web_client *w, char *
int web_client_api_request_v1_chart(RRDHOST *host, struct web_client *w, char *url);
int web_client_api_request_v1_registry(RRDHOST *host, struct web_client *w, char *url);
int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, char *url);
int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *url);
int web_client_api_request_v1(RRDHOST *host, struct web_client *w, char *url_path_endpoint);
int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb);
void web_client_api_v1_init(void);

View file

@ -1,15 +1,16 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "web_api_v2.h"
#include "../rtc/webrtc.h"
static int web_client_api_request_v2_contexts_internal(RRDHOST *host __maybe_unused, struct web_client *w, char *url, CONTEXTS_V2_OPTIONS options) {
struct api_v2_contexts_request req = { 0 };
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -110,10 +111,10 @@ static int web_client_api_request_v2_data(RRDHOST *host __maybe_unused, struct w
size_t group_by_idx = 0, group_by_label_idx = 0, aggregation_idx = 0;
while(url) {
char *value = mystrsep(&url, "&");
char *value = strsep_skip_consecutive_separators(&url, "&");
if(!value || !*value) continue;
char *name = mystrsep(&value, "=");
char *name = strsep_skip_consecutive_separators(&value, "=");
if(!name || !*name) continue;
if(!value || !*value) continue;
@ -161,10 +162,10 @@ static int web_client_api_request_v2_data(RRDHOST *host __maybe_unused, struct w
char *tqx_name, *tqx_value;
while(value) {
tqx_value = mystrsep(&value, ";");
tqx_value = strsep_skip_consecutive_separators(&value, ";");
if(!tqx_value || !*tqx_value) continue;
tqx_name = mystrsep(&tqx_value, ":");
tqx_name = strsep_skip_consecutive_separators(&tqx_value, ":");
if(!tqx_name || !*tqx_name) continue;
if(!tqx_value || !*tqx_value) continue;
@ -235,7 +236,7 @@ static int web_client_api_request_v2_data(RRDHOST *host __maybe_unused, struct w
time_t before = (before_str && *before_str)?str2l(before_str):0;
time_t after = (after_str && *after_str) ?str2l(after_str):-600;
size_t points = (points_str && *points_str)?str2u(points_str):0;
time_t timeout = (timeout_str && *timeout_str)?str2l(timeout_str): 0;
int timeout = (timeout_str && *timeout_str)?str2i(timeout_str): 0;
time_t resampling_time = (resampling_time_str && *resampling_time_str) ? str2l(resampling_time_str) : 0;
QUERY_TARGET_REQUEST qtr = {
@ -281,17 +282,10 @@ static int web_client_api_request_v2_data(RRDHOST *host __maybe_unused, struct w
goto cleanup;
}
if (timeout) {
struct timeval now;
now_realtime_timeval(&now);
int inqueue = (int)dt_usec(&w->tv_in, &now) / 1000;
timeout -= inqueue;
if (timeout <= 0) {
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Query timeout exceeded");
ret = HTTP_RESP_BACKEND_FETCH_FAILED;
goto cleanup;
}
web_client_timeout_checkpoint_set(w, timeout);
if(web_client_timeout_checkpoint_and_check(w, NULL)) {
ret = w->response.code;
goto cleanup;
}
if(outFileName && *outFileName) {
@ -347,20 +341,24 @@ cleanup:
return ret;
}
static int web_client_api_request_v2_webrtc(RRDHOST *host __maybe_unused, struct web_client *w, char *url __maybe_unused) {
return webrtc_new_connection(w->post_payload, w->response.data);
}
static struct web_api_command api_commands_v2[] = {
{"data", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_data},
{"nodes", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_nodes},
{"contexts", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_contexts},
{"weights", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_weights},
{"q", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_q},
{"data", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v2_data},
{"nodes", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v2_nodes},
{"contexts", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v2_contexts},
{"weights", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v2_weights},
{"q", 0, WEB_CLIENT_ACL_DASHBOARD_ACLK_WEBRTC, web_client_api_request_v2_q},
{"rtc_offer", 0, WEB_CLIENT_ACL_DASHBOARD | WEB_CLIENT_ACL_ACLK, web_client_api_request_v2_webrtc},
// terminator
{NULL, 0, WEB_CLIENT_ACL_NONE, NULL},
};
inline int web_client_api_request_v2(RRDHOST *host, struct web_client *w, char *url) {
inline int web_client_api_request_v2(RRDHOST *host, struct web_client *w, char *url_path_endpoint) {
static int initialized = 0;
if(unlikely(initialized == 0)) {
@ -370,5 +368,5 @@ inline int web_client_api_request_v2(RRDHOST *host, struct web_client *w, char *
api_commands_v2[i].hash = simple_hash(api_commands_v2[i].command);
}
return web_client_api_request_vX(host, w, url, api_commands_v2);
return web_client_api_request_vX(host, w, url_path_endpoint, api_commands_v2);
}

View file

@ -7,6 +7,6 @@
struct web_client;
int web_client_api_request_v2(RRDHOST *host, struct web_client *w, char *url);
int web_client_api_request_v2(RRDHOST *host, struct web_client *w, char *url_path_endpoint);
#endif //NETDATA_WEB_API_V2_H

11
web/rtc/Makefile.am Normal file
View file

@ -0,0 +1,11 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
SUBDIRS = \
$(NULL)
dist_noinst_DATA = \
README.md \
$(NULL)

0
web/rtc/README.md Normal file
View file

740
web/rtc/webrtc.c Normal file
View file

@ -0,0 +1,740 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "webrtc.h"
#include "../server/web_client.h"
#include "../server/web_client_cache.h"
#ifdef HAVE_LIBDATACHANNEL
#include "rtc/rtc.h"
#define WEBRTC_OUR_MAX_MESSAGE_SIZE (5 * 1024 * 1024)
#define WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE (65536)
#define WEBRTC_COMPRESSED_HEADER_SIZE 200
static void webrtc_log(rtcLogLevel level, const char *message) {
switch(level) {
case RTC_LOG_NONE:
break;
case RTC_LOG_WARNING:
case RTC_LOG_ERROR:
case RTC_LOG_FATAL:
error("WEBRTC: %s", message);
break;
case RTC_LOG_INFO:
info("WEBRTC: %s", message);
break;
default:
case RTC_LOG_DEBUG:
case RTC_LOG_VERBOSE:
internal_error(true, "WEBRTC: %s", message);
break;
}
}
typedef struct webrtc_datachannel {
int dc;
char *label;
struct webrtc_connection *conn;
bool open; // atomic
struct {
struct webrtc_datachannel *prev;
struct webrtc_datachannel *next;
} link;
} WEBRTC_DC;
typedef struct webrtc_connection {
int pc;
rtcConfiguration config;
rtcState state;
rtcGatheringState gathering_state;
size_t max_message_size;
size_t local_max_message_size;
size_t remote_max_message_size;
struct {
SPINLOCK spinlock;
BUFFER *wb;
bool sdp;
bool candidates;
} response;
struct {
SPINLOCK spinlock;
WEBRTC_DC *head;
} channels;
struct {
struct webrtc_connection *prev;
struct webrtc_connection *next;
} link;
} WEBRTC_CONN;
#define WEBRTC_MAX_ICE_SERVERS 100
static struct {
bool enabled;
char *iceServers[WEBRTC_MAX_ICE_SERVERS];
int iceServersCount;
char *proxyServer;
char *bindAddress;
struct {
SPINLOCK spinlock;
WEBRTC_CONN *head;
} unsafe;
} webrtc_base = {
#ifdef NETDATA_INTERNAL_CHECKS
.enabled = true,
#else
.enabled = false,
#endif
.iceServers = {
// Format:
// [("stun"|"turn"|"turns") (":"|"://")][username ":" password "@"]hostname[":" port]["?transport=" ("udp"|"tcp"|"tls")]
//
// Note transports TCP and TLS are only available for a TURN server with libnice as ICE backend and govern only the
// TURN control connection, meaning relaying is always performed over UDP.
//
// If the username or password of a URI contains reserved special characters, they must be percent-encoded.
// In particular, ":" must be encoded as "%3A" and "@" must by encoded as "%40".
"stun://stun.l.google.com:19302",
NULL, // terminator
},
.iceServersCount = 1,
.proxyServer = NULL, // [("http"|"socks5") (":"|"://")][username ":" password "@"]hostname[" :" port]
.bindAddress = NULL,
.unsafe = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.head = NULL,
},
};
static inline bool webrtc_dc_is_open(WEBRTC_DC *chan) {
return __atomic_load_n(&chan->open, __ATOMIC_RELAXED);
}
static void webrtc_config_ice_servers(void) {
BUFFER *wb = buffer_create(0, NULL);
int i;
for(i = 0; i < WEBRTC_MAX_ICE_SERVERS ;i++) {
if (webrtc_base.iceServers[i]) {
if (buffer_strlen(wb))
buffer_strcat(wb, " ");
internal_error(true, "WEBRTC: default ice server No %d is: '%s'", i, webrtc_base.iceServers[i]);
buffer_strcat(wb, webrtc_base.iceServers[i]);
}
else
break;
}
webrtc_base.iceServersCount = i;
internal_error(true, "WEBRTC: there are %d default ice servers: '%s'", webrtc_base.iceServersCount, buffer_tostring(wb));
char *servers = config_get(CONFIG_SECTION_WEBRTC, "ice servers", buffer_tostring(wb));
webrtc_base.iceServersCount = 0;
char *s = servers, *e;
while(*s) {
if(isspace(*s))
s++;
e = s;
while(*e && !isspace(*e))
e++;
if(s != e && webrtc_base.iceServersCount < WEBRTC_MAX_ICE_SERVERS) {
char old = *e;
*e = '\0';
internal_error(true, "WEBRTC: ice server No %d is: '%s'", webrtc_base.iceServersCount, s);
webrtc_base.iceServers[webrtc_base.iceServersCount++] = strdupz(s);
*e = old;
}
if(*e)
s = e + 1;
else
break;
}
buffer_free(wb);
}
void webrtc_initialize() {
webrtc_base.enabled = config_get_boolean(CONFIG_SECTION_WEBRTC, "enabled", webrtc_base.enabled);
internal_error(true, "WEBRTC: is %s", webrtc_base.enabled ? "enabled" : "disabled");
webrtc_config_ice_servers();
webrtc_base.proxyServer = config_get(CONFIG_SECTION_WEBRTC, "proxy server", webrtc_base.proxyServer ? webrtc_base.proxyServer : "");
if(!webrtc_base.proxyServer || !*webrtc_base.proxyServer)
webrtc_base.proxyServer = NULL;
internal_error(true, "WEBRTC: proxy server is: '%s'", webrtc_base.proxyServer ? webrtc_base.proxyServer : "");
webrtc_base.bindAddress = config_get(CONFIG_SECTION_WEBRTC, "bind address", webrtc_base.bindAddress ? webrtc_base.bindAddress : "");
if(!webrtc_base.bindAddress || !*webrtc_base.bindAddress)
webrtc_base.bindAddress = NULL;
internal_error(true, "WEBRTC: bind address is: '%s'", webrtc_base.bindAddress ? webrtc_base.bindAddress : "");
if(!webrtc_base.enabled)
return;
rtcLogLevel level;
#ifdef NETDATA_INTERNAL_CHECKS
level = RTC_LOG_INFO;
#else
level = RTC_LOG_WARNING;
#endif
rtcInitLogger(level, webrtc_log);
rtcPreload();
}
void webrtc_close_all_connections() {
if(!webrtc_base.enabled)
return;
rtcCleanup();
}
size_t find_max_message_size_in_sdp(const char *sdp) {
char *s = strstr(sdp, "a=max-message-size:");
if(s)
return str2ul(&s[19]);
return WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
}
// ----------------------------------------------------------------------------
// execute web API requests
static bool web_client_stop_callback(struct web_client *w __maybe_unused, void *data) {
WEBRTC_DC *chan = data;
return !webrtc_dc_is_open(chan);
}
static size_t webrtc_send_in_chunks(WEBRTC_DC *chan, const char *data, size_t size, int code, const char *message_type, HTTP_CONTENT_TYPE content_type, size_t max_message_size, bool binary) {
size_t sent_bytes = 0;
size_t chunk = 0;
size_t total_chunks = size / max_message_size;
if(total_chunks * max_message_size < size)
total_chunks++;
char *send_buffer = mallocz(chan->conn->max_message_size);
char *s = (char *)data;
size_t remaining = size;
while(remaining > 0) {
chunk++;
size_t message_size = MIN(remaining, max_message_size);
int len = snprintfz(send_buffer, WEBRTC_COMPRESSED_HEADER_SIZE, "%d %s %zu %zu %zu %s\r\n",
code,
message_type,
message_size,
chunk,
total_chunks,
web_content_type_to_string(content_type)
);
internal_fatal((size_t)len != strlen(send_buffer), "WEBRTC compressed header line mismatch");
internal_fatal(len + message_size > chan->conn->max_message_size, "WEBRTC message exceeds max message size");
memcpy(&send_buffer[len], s, message_size);
int total_message_size = (int)(len + message_size);
sent_bytes += total_message_size;
if(!binary)
total_message_size = -total_message_size;
if(rtcSendMessage(chan->dc, send_buffer, total_message_size) != RTC_ERR_SUCCESS)
error("WEBRTC[%d],DC[%d]: failed to send LZ4 chunk %zu of %zu", chan->conn->pc, chan->dc, chunk, total_chunks);
else
internal_error(true, "WEBRTC[%d],DC[%d]: sent chunk %zu of %zu, size %zu (total %d)",
chan->conn->pc, chan->dc, chunk, total_chunks, message_size, total_message_size);
s = s + message_size;
remaining -= message_size;
}
internal_fatal(chunk != total_chunks, "WEBRTC number of compressed chunks mismatch");
freez(send_buffer);
return sent_bytes;
}
static void webrtc_execute_api_request(WEBRTC_DC *chan, const char *request, size_t size __maybe_unused, bool binary __maybe_unused) {
struct timeval tv;
internal_error(true, "WEBRTC[%d],DC[%d]: got request '%s' of size %zu and type %s.",
chan->conn->pc, chan->dc, request, size, binary?"binary":"text");
struct web_client *w = web_client_get_from_cache();
w->statistics.received_bytes = size;
w->interrupt.callback = web_client_stop_callback;
w->interrupt.callback_data = chan;
w->acl = WEB_CLIENT_ACL_WEBRTC;
char *path = (char *)request;
if(strncmp(request, "POST ", 5) == 0) {
w->mode = WEB_CLIENT_MODE_POST;
path += 10;
}
else if(strncmp(request, "GET ", 4) == 0) {
w->mode = WEB_CLIENT_MODE_GET;
path += 4;
}
web_client_timeout_checkpoint_set(w, 0);
web_client_decode_path_and_query_string(w, path);
path = (char *)buffer_tostring(w->url_path_decoded);
w->response.code = web_client_api_request_with_node_selection(localhost, w, path);
web_client_timeout_checkpoint_response_ready(w, NULL);
size_t sent_bytes = 0;
size_t response_size = buffer_strlen(w->response.data);
bool send_plain = true;
int max_message_size = (int)chan->conn->max_message_size - WEBRTC_COMPRESSED_HEADER_SIZE;
if(!webrtc_dc_is_open(chan)) {
internal_error(true, "WEBRTC[%d],DC[%d]: ignoring API response on closed data channel.", chan->conn->pc, chan->dc);
goto cleanup;
}
else {
internal_error(true, "WEBRTC[%d],DC[%d]: prepared response with code %d, size %zu.",
chan->conn->pc, chan->dc, w->response.code, response_size);
}
#if defined(ENABLE_COMPRESSION)
int max_compressed_size = LZ4_compressBound((int)response_size);
char *compressed = mallocz(max_compressed_size);
int compressed_size = LZ4_compress_default(buffer_tostring(w->response.data), compressed,
(int)response_size, max_compressed_size);
if(compressed_size > 0) {
send_plain = false;
sent_bytes = webrtc_send_in_chunks(chan, compressed, compressed_size,
w->response.code, "LZ4", w->response.data->content_type,
max_message_size, true);
}
freez(compressed);
#endif
if(send_plain)
sent_bytes = webrtc_send_in_chunks(chan, buffer_tostring(w->response.data), buffer_strlen(w->response.data),
w->response.code, "PLAIN", w->response.data->content_type,
max_message_size, false);
w->statistics.sent_bytes = sent_bytes;
cleanup:
now_monotonic_high_precision_timeval(&tv);
log_access("%llu: %d '[RTC]:%d:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
w->id
, gettid()
, chan->conn->pc, chan->dc
, "DATA"
, sent_bytes
, response_size
, response_size > sent_bytes ? -(((double)(response_size - sent_bytes) / (double)response_size) * 100.0) : ((response_size > 0) ? (((sent_bytes - response_size) / (double)response_size) * 100.0) : 0.0)
, dt_usec(&w->timings.tv_ready, &w->timings.tv_in) / 1000.0
, dt_usec(&tv, &w->timings.tv_ready) / 1000.0
, dt_usec(&tv, &w->timings.tv_in) / 1000.0
, w->response.code
, strip_control_characters((char *)buffer_tostring(w->url_as_received))
);
web_client_release_to_cache(w);
}
// ----------------------------------------------------------------------------
// webrtc data channel
static void myOpenCallback(int id, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_DC *chan = user_ptr;
internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
log_access("WEBRTC[%d],DC[%d]: %d DATA CHANNEL '%s' OPEN", chan->conn->pc, chan->dc, gettid(), chan->label);
internal_error(true, "WEBRTC[%d],DC[%d]: data channel opened.", chan->conn->pc, chan->dc);
chan->open = true;
}
static void myClosedCallback(int id, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_DC *chan = user_ptr;
internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
__atomic_store_n(&chan->open, false, __ATOMIC_RELAXED);
internal_error(true, "WEBRTC[%d],DC[%d]: data channel closed.", chan->conn->pc, chan->dc);
netdata_spinlock_lock(&chan->conn->channels.spinlock);
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(chan->conn->channels.head, chan, link.prev, link.next);
netdata_spinlock_unlock(&chan->conn->channels.spinlock);
log_access("WEBRTC[%d],DC[%d]: %d DATA CHANNEL '%s' CLOSED", chan->conn->pc, chan->dc, gettid(), chan->label);
freez(chan->label);
freez(chan);
}
static void myErrorCallback(int id, const char *error, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_DC *chan = user_ptr;
internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
error("WEBRTC[%d],DC[%d]: ERROR: '%s'", chan->conn->pc, chan->dc, error);
}
static void myMessageCallback(int id, const char *message, int size, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_DC *chan = user_ptr;
internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
internal_fatal(!webrtc_dc_is_open(chan), "WEBRTC[%d],DC[%d]: received message on closed channel", chan->conn->pc, chan->dc);
bool binary = (size >= 0);
if(size < 0)
size = -size;
webrtc_execute_api_request(chan, message, size, binary);
}
//#define WEBRTC_MAX_REQUEST_SIZE 65536
//
//static void myAvailableCallback(int id, void *user_ptr) {
// webrtc_set_thread_name();
//
// WEBRTC_DC *chan = user_ptr;
// internal_fatal(chan->dc != id, "WEBRTC[%d],DC[%d]: dc mismatch, expected %d, got %d", chan->conn->pc, chan->dc, chan->dc, id);
//
// internal_fatal(!chan->open, "WEBRTC[%d],DC[%d]: received message on closed channel", chan->conn->pc, chan->dc);
//
// int size = WEBRTC_MAX_REQUEST_SIZE;
// char buffer[WEBRTC_MAX_REQUEST_SIZE];
// while(rtcReceiveMessage(id, buffer, &size) == RTC_ERR_SUCCESS) {
// bool binary = (size >= 0);
// if(size < 0)
// size = -size;
//
// webrtc_execute_api_request(chan, message, size, binary);
// }
//}
static void myDataChannelCallback(int pc, int dc, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_CONN *conn = user_ptr;
internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
WEBRTC_DC *chan = callocz(1, sizeof(WEBRTC_DC));
chan->dc = dc;
chan->conn = conn;
netdata_spinlock_lock(&conn->channels.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(conn->channels.head, chan, link.prev, link.next);
netdata_spinlock_unlock(&conn->channels.spinlock);
rtcSetUserPointer(dc, chan);
char label[1024 + 1];
rtcGetDataChannelLabel(dc, label, 1024);
label[1024] = '\0';
chan->label = strdupz(label);
if(rtcSetOpenCallback(dc, myOpenCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d],DC[%d]: rtcSetOpenCallback() failed.", conn->pc, chan->dc);
if(rtcSetClosedCallback(dc, myClosedCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d],DC[%d]: rtcSetClosedCallback() failed.", conn->pc, chan->dc);
if(rtcSetErrorCallback(dc, myErrorCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d],DC[%d]: rtcSetErrorCallback() failed.", conn->pc, chan->dc);
if(rtcSetMessageCallback(dc, myMessageCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d],DC[%d]: rtcSetMessageCallback() failed.", conn->pc, chan->dc);
// if(rtcSetAvailableCallback(dc, myAvailableCallback) != RTC_ERR_SUCCESS)
// error("WEBRTC[%d],DC[%d]: rtcSetAvailableCallback() failed.", conn->pc, chan->dc);
internal_error(true, "WEBRTC[%d],DC[%d]: new data channel with label '%s'", chan->conn->pc, chan->dc, chan->label);
}
// ----------------------------------------------------------------------------
// webrtc connection
static inline void webrtc_destroy_connection_unsafe(WEBRTC_CONN *conn) {
if(conn->state == RTC_CLOSED) {
netdata_spinlock_lock(&conn->channels.spinlock);
WEBRTC_DC *chan = conn->channels.head;
netdata_spinlock_unlock(&conn->channels.spinlock);
if(!chan) {
internal_error(true, "WEBRTC[%d]: destroying connection", conn->pc);
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(webrtc_base.unsafe.head, conn, link.prev, link.next);
freez(conn);
}
else {
internal_error(true, "WEBRTC[%d]: not destroying closed connection because it has data channels running", conn->pc);
}
}
}
static void cleanupConnections() {
netdata_spinlock_lock(&webrtc_base.unsafe.spinlock);
WEBRTC_CONN *conn = webrtc_base.unsafe.head;
while(conn) {
WEBRTC_CONN *conn_next = conn->link.next;
webrtc_destroy_connection_unsafe(conn);
conn = conn_next;
}
netdata_spinlock_unlock(&webrtc_base.unsafe.spinlock);
}
static WEBRTC_CONN *webrtc_create_connection(void) {
WEBRTC_CONN *conn = callocz(1, sizeof(WEBRTC_CONN));
netdata_spinlock_init(&conn->response.spinlock);
netdata_spinlock_init(&conn->channels.spinlock);
netdata_spinlock_lock(&webrtc_base.unsafe.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(webrtc_base.unsafe.head, conn, link.prev, link.next);
netdata_spinlock_unlock(&webrtc_base.unsafe.spinlock);
return conn;
}
static void myDescriptionCallback(int pc __maybe_unused, const char *sdp, const char *type, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_CONN *conn = user_ptr;
internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
internal_error(true, "WEBRTC[%d]: local description type '%s': %s", conn->pc, type, sdp);
netdata_spinlock_lock(&conn->response.spinlock);
if(!conn->response.candidates) {
buffer_json_member_add_string(conn->response.wb, "sdp", sdp);
buffer_json_member_add_string(conn->response.wb, "type", type);
conn->response.sdp = true;
}
netdata_spinlock_unlock(&conn->response.spinlock);
conn->local_max_message_size = find_max_message_size_in_sdp(sdp);
}
static void myCandidateCallback(int pc __maybe_unused, const char *cand, const char *mid __maybe_unused, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_CONN *conn = user_ptr;
internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
netdata_spinlock_lock(&conn->response.spinlock);
if(!conn->response.candidates) {
buffer_json_member_add_array(conn->response.wb, "candidates");
conn->response.candidates = true;
}
internal_error(true, "WEBRTC[%d]: local candidate '%s', mid '%s'", conn->pc, cand, mid);
buffer_json_add_array_item_string(conn->response.wb, cand);
netdata_spinlock_unlock(&conn->response.spinlock);
}
static void myStateChangeCallback(int pc __maybe_unused, rtcState state, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_CONN *conn = user_ptr;
internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
conn->state = state;
switch(state) {
case RTC_NEW:
internal_error(true, "WEBRTC[%d]: new connection...", conn->pc);
break;
case RTC_CONNECTING:
log_access("WEBRTC[%d]: %d CONNECTING", conn->pc, gettid());
internal_error(true, "WEBRTC[%d]: connecting...", conn->pc);
break;
case RTC_CONNECTED:
log_access("WEBRTC[%d]: %d CONNECTED", conn->pc, gettid());
internal_error(true, "WEBRTC[%d]: connected!", conn->pc);
break;
case RTC_DISCONNECTED:
log_access("WEBRTC[%d]: %d DISCONNECTED", conn->pc, gettid());
internal_error(true, "WEBRTC[%d]: disconnected.", conn->pc);
break;
case RTC_FAILED:
log_access("WEBRTC[%d]: %d CONNECTION FAILED", conn->pc, gettid());
internal_error(true, "WEBRTC[%d]: failed.", conn->pc);
break;
case RTC_CLOSED:
log_access("WEBRTC[%d]: %d CONNECTION CLOSED", conn->pc, gettid());
internal_error(true, "WEBRTC[%d]: closed.", conn->pc);
netdata_spinlock_lock(&webrtc_base.unsafe.spinlock);
webrtc_destroy_connection_unsafe(conn);
netdata_spinlock_unlock(&webrtc_base.unsafe.spinlock);
break;
}
}
static void myGatheringStateCallback(int pc __maybe_unused, rtcGatheringState state, void *user_ptr) {
webrtc_set_thread_name();
WEBRTC_CONN *conn = user_ptr;
internal_fatal(conn->pc != pc, "WEBRTC[%d]: pc mismatch, expected %d, got %d", conn->pc, conn->pc, pc);
conn->gathering_state = state;
switch(state) {
case RTC_GATHERING_NEW:
internal_error(true, "WEBRTC[%d]: gathering...", conn->pc);
break;
case RTC_GATHERING_INPROGRESS:
internal_error(true, "WEBRTC[%d]: gathering in progress...", conn->pc);
break;
case RTC_GATHERING_COMPLETE:
internal_error(true, "WEBRTC[%d]: gathering complete!", conn->pc);
break;
}
}
int webrtc_new_connection(const char *sdp, BUFFER *wb) {
if(unlikely(!webrtc_base.enabled)) {
buffer_flush(wb);
buffer_strcat(wb, "WebRTC is not enabled on this agent.");
wb->content_type = CT_TEXT_PLAIN;
return HTTP_RESP_BAD_REQUEST;
}
cleanupConnections();
if(unlikely(!sdp || !*sdp)) {
buffer_flush(wb);
buffer_strcat(wb, "No SDP message posted with the request");
wb->content_type = CT_TEXT_PLAIN;
return HTTP_RESP_BAD_REQUEST;
}
buffer_flush(wb);
buffer_json_initialize(wb, "\"", "\"", 0, true, false);
wb->content_type = CT_APPLICATION_JSON;
WEBRTC_CONN *conn = webrtc_create_connection();
conn->response.wb = wb;
conn->max_message_size = WEBRTC_DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
conn->local_max_message_size = WEBRTC_OUR_MAX_MESSAGE_SIZE;
conn->remote_max_message_size = find_max_message_size_in_sdp(sdp);
conn->config.iceServers = (const char **)webrtc_base.iceServers;
conn->config.iceServersCount = webrtc_base.iceServersCount;
conn->config.proxyServer = webrtc_base.proxyServer;
conn->config.bindAddress = webrtc_base.bindAddress;
conn->config.certificateType = RTC_CERTIFICATE_DEFAULT;
conn->config.iceTransportPolicy = RTC_TRANSPORT_POLICY_ALL;
conn->config.enableIceTcp = true; // libnice only
conn->config.enableIceUdpMux = true; // libjuice only
conn->config.disableAutoNegotiation = false;
conn->config.forceMediaTransport = false;
conn->config.portRangeBegin = 0; // 0 means automatic
conn->config.portRangeEnd = 0; // 0 means automatic
conn->config.mtu = 0; // <= 0 means automatic
conn->config.maxMessageSize = WEBRTC_OUR_MAX_MESSAGE_SIZE; // <= 0 means default
conn->pc = rtcCreatePeerConnection(&conn->config);
rtcSetUserPointer(conn->pc, conn);
if(rtcSetLocalDescriptionCallback(conn->pc, myDescriptionCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetLocalDescriptionCallback() failed", conn->pc);
if(rtcSetLocalCandidateCallback(conn->pc, myCandidateCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetLocalCandidateCallback() failed", conn->pc);
if(rtcSetStateChangeCallback(conn->pc, myStateChangeCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetStateChangeCallback() failed", conn->pc);
if(rtcSetGatheringStateChangeCallback(conn->pc, myGatheringStateCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetGatheringStateChangeCallback() failed", conn->pc);
if(rtcSetDataChannelCallback(conn->pc, myDataChannelCallback) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetDataChannelCallback() failed", conn->pc);
// initialize the handshake
internal_error(true, "WEBRTC[%d]: setting remote sdp: %s", conn->pc, sdp);
if(rtcSetRemoteDescription(conn->pc, sdp, "offer") != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetRemoteDescription() failed", conn->pc);
// initiate the handshake process
if(conn->config.disableAutoNegotiation) {
if(rtcSetLocalDescription(conn->pc, NULL) != RTC_ERR_SUCCESS)
error("WEBRTC[%d]: rtcSetLocalDescription() failed", conn->pc);
}
bool logged = false;
while(conn->gathering_state != RTC_GATHERING_COMPLETE) {
if(!logged) {
logged = true;
internal_error(true, "WEBRTC[%d]: Waiting for gathering to complete", conn->pc);
}
usleep(1000);
}
if(logged)
internal_error(true, "WEBRTC[%d]: Gathering finished, our answer is ready", conn->pc);
internal_fatal(!conn->response.sdp, "WEBRTC[%d]: response does not have an SDP: %s", conn->pc, buffer_tostring(conn->response.wb));
internal_fatal(!conn->response.candidates, "WEBRTC[%d]: response does not have candidates: %s", conn->pc, buffer_tostring(conn->response.wb));
conn->max_message_size = MIN(conn->local_max_message_size, conn->remote_max_message_size);
if(conn->max_message_size < WEBRTC_COMPRESSED_HEADER_SIZE)
conn->max_message_size = WEBRTC_COMPRESSED_HEADER_SIZE;
buffer_json_finalize(wb);
return HTTP_RESP_OK;
}
#else // ! HAVE_LIBDATACHANNEL
void webrtc_initialize() {
;
}
int webrtc_new_connection(const char *sdp __maybe_unused, BUFFER *wb) {
buffer_flush(wb);
buffer_strcat(wb, "WEBRTC is not available on this server");
wb->content_type = CT_TEXT_PLAIN;
return HTTP_RESP_BAD_REQUEST;
}
void webrtc_close_all_connections() {
;
}
#endif // ! HAVE_LIBDATACHANNEL

12
web/rtc/webrtc.h Normal file
View file

@ -0,0 +1,12 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_WEBRTC_H
#define NETDATA_WEBRTC_H
#include "../../libnetdata/libnetdata.h"
int webrtc_new_connection(const char *sdp, BUFFER *wb);
void webrtc_close_all_connections();
void webrtc_initialize();
#endif //NETDATA_WEBRTC_H

View file

@ -28,7 +28,7 @@ long web_client_streaming_rate_t = 0L;
static struct web_client *web_client_create_on_fd(POLLINFO *pi) {
struct web_client *w;
w = web_client_get_from_cache_or_allocate();
w = web_client_get_from_cache();
w->ifd = w->ofd = pi->fd;
strncpyz(w->client_ip, pi->client_ip, sizeof(w->client_ip) - 1);
@ -39,7 +39,19 @@ static struct web_client *web_client_create_on_fd(POLLINFO *pi) {
if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
w->port_acl = pi->port_acl;
web_client_initialize_connection(w);
int flag = 1;
if(unlikely(web_client_check_tcp(w) && setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0))
debug(D_WEB_CLIENT, "%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd);
flag = 1;
if(unlikely(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0))
debug(D_WEB_CLIENT, "%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd);
web_client_update_acl_matches(w);
web_client_enable_wait_receive(w);
web_server_log_connection(w, "CONNECTED");
w->pollinfo_slot = pi->slot;
return(w);
}
@ -107,7 +119,10 @@ static void web_server_file_del_callback(POLLINFO *pi) {
if(unlikely(!w->pollinfo_slot)) {
debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
web_client_release(w);
web_server_log_connection(w, "DISCONNECTED");
web_client_request_done(w);
web_client_release_to_cache(w);
global_statistics_web_client_disconnected();
}
worker_is_idle();
@ -278,7 +293,10 @@ static void web_server_del_callback(POLLINFO *pi) {
pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
web_client_release(w);
web_server_log_connection(w, "DISCONNECTED");
web_client_request_done(w);
web_client_release_to_cache(w);
global_statistics_web_client_disconnected();
}
worker_is_idle();
@ -402,9 +420,6 @@ cleanup:
static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
worker_private = (struct web_server_static_threaded_worker *)ptr;
info("freeing local web clients cache...");
web_client_cache_destroy();
info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
worker_private->connected,
worker_private->disconnected,

File diff suppressed because it is too large Load diff

View file

@ -39,10 +39,11 @@ extern int respect_web_browser_do_not_track_policy;
extern char *web_x_frame_options;
typedef enum web_client_mode {
WEB_CLIENT_MODE_NORMAL = 0,
WEB_CLIENT_MODE_FILECOPY = 1,
WEB_CLIENT_MODE_OPTIONS = 2,
WEB_CLIENT_MODE_STREAM = 3
WEB_CLIENT_MODE_GET = 0,
WEB_CLIENT_MODE_POST = 1,
WEB_CLIENT_MODE_FILECOPY = 2,
WEB_CLIENT_MODE_OPTIONS = 3,
WEB_CLIENT_MODE_STREAM = 4,
} WEB_CLIENT_MODE;
typedef enum {
@ -122,8 +123,6 @@ typedef enum web_client_flags {
#define web_client_is_corkable(w) web_client_flag_check(w, WEB_CLIENT_FLAG_TCP_CLIENT)
#define NETDATA_WEB_REQUEST_URL_SIZE 65536 // static allocation
#define NETDATA_WEB_REQUEST_COOKIE_SIZE 1024 // static allocation
#define NETDATA_WEB_REQUEST_ORIGIN_HEADER_SIZE 1024 // static allocation
#define NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE 16384
@ -131,82 +130,99 @@ typedef enum web_client_flags {
#define NETDATA_WEB_RESPONSE_INITIAL_SIZE 8192
#define NETDATA_WEB_REQUEST_INITIAL_SIZE 8192
#define NETDATA_WEB_REQUEST_MAX_SIZE 65536
#define NETDATA_WEB_DECODED_URL_INITIAL_SIZE 512
struct response {
BUFFER *header; // our response header
BUFFER *header_output; // internal use
BUFFER *data; // our response data buffer
BUFFER *header; // our response header
BUFFER *header_output; // internal use
BUFFER *data; // our response data buffer
int code; // the HTTP response code
short int code; // the HTTP response code
bool has_cookies;
size_t rlen; // if non-zero, the excepted size of ifd (input of firecopy)
size_t sent; // current data length sent to output
int zoutput; // if set to 1, web_client_send() will send compressed data
bool zoutput; // if set to 1, web_client_send() will send compressed data
#ifdef NETDATA_WITH_ZLIB
bool zinitialized;
z_stream zstream; // zlib stream for sending compressed output to client
Bytef zbuffer[NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE]; // temporary buffer for storing compressed output
size_t zsent; // the compressed bytes we have sent to the client
size_t zhave; // the compressed bytes that we have received from zlib
unsigned int zinitialized : 1;
Bytef zbuffer[NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE]; // temporary buffer for storing compressed output
#endif /* NETDATA_WITH_ZLIB */
};
struct web_client;
typedef bool (*web_client_interrupt_t)(struct web_client *, void *data);
struct web_client {
unsigned long long id;
size_t use_count;
WEB_CLIENT_FLAGS flags; // status flags for the client
WEB_CLIENT_MODE mode; // the operational mode of the client
WEB_CLIENT_ACL acl; // the access list of the client
int port_acl; // the operations permitted on the port the client connected to
char *auth_bearer_token; // the Bearer auth token (if sent)
WEB_CLIENT_FLAGS flags; // status flags for the client
WEB_CLIENT_MODE mode; // the operational mode of the client
WEB_CLIENT_ACL acl; // the access list of the client
int port_acl; // the operations permitted on the port the client connected to
size_t header_parse_tries;
size_t header_parse_last_size;
int tcp_cork; // 1 = we have a cork on the socket
bool tcp_cork;
int ifd;
int ofd;
char client_ip[INET6_ADDRSTRLEN]; // Defined buffer sizes include null-terminators
char client_ip[INET6_ADDRSTRLEN]; // Defined buffer sizes include null-terminators
char client_port[NI_MAXSERV];
char server_host[NI_MAXHOST];
char client_host[NI_MAXHOST];
char forwarded_host[NI_MAXHOST]; //Used with proxy
char decoded_url[NETDATA_WEB_REQUEST_URL_SIZE + 1]; // we decode the URL in this buffer
char decoded_query_string[NETDATA_WEB_REQUEST_URL_SIZE + 1]; // we decode the Query String in this buffer
char last_url[NETDATA_WEB_REQUEST_URL_SIZE + 1]; // we keep a copy of the decoded URL here
size_t url_path_length;
char separator; // This value can be either '?' or 'f'
char *url_search_path; //A pointer to the search path sent by the client
BUFFER *url_as_received; // the entire URL as received, used for logging - DO NOT MODIFY
BUFFER *url_path_decoded; // the path, decoded - it is incrementally parsed and altered
BUFFER *url_query_string_decoded; // the query string, decoded - it is incrementally parsed and altered
struct timeval tv_in, tv_ready;
// THESE NEED TO BE FREED
char *auth_bearer_token; // the Bearer auth token (if sent)
char *server_host; // the Host: header
char *forwarded_host; // the X-Forwarded-For: header
char *origin; // the Origin: header
char *user_agent; // the User-Agent: header
char cookie1[NETDATA_WEB_REQUEST_COOKIE_SIZE + 1];
char cookie2[NETDATA_WEB_REQUEST_COOKIE_SIZE + 1];
char origin[NETDATA_WEB_REQUEST_ORIGIN_HEADER_SIZE + 1];
char *user_agent;
struct response response;
size_t stats_received_bytes;
size_t stats_sent_bytes;
// cache of web_client allocations
struct web_client *prev; // maintain a linked list of web clients
struct web_client *next; // for the web servers that need it
// MULTI-THREADED WEB SERVER MEMBERS
netdata_thread_t thread; // the thread servicing this client
volatile int running; // 1 when the thread runs, 0 otherwise
char *post_payload; // when this request is a POST, this has the payload
size_t post_payload_size; // the size of the buffer allocated for the payload
// the actual contents may be less than the size
// STATIC-THREADED WEB SERVER MEMBERS
size_t pollinfo_slot; // POLLINFO slot of the web client
size_t pollinfo_filecopy_slot; // POLLINFO slot of the file read
size_t pollinfo_slot; // POLLINFO slot of the web client
size_t pollinfo_filecopy_slot; // POLLINFO slot of the file read
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
struct { // A callback to check if the query should be interrupted / stopped
web_client_interrupt_t callback;
void *callback_data;
} interrupt;
struct {
size_t received_bytes;
size_t sent_bytes;
size_t *memory_accounting; // temporary pointer for constructor to use
} statistics;
struct {
usec_t timeout_ut; // timeout if set, or zero
struct timeval tv_in; // request received
struct timeval tv_ready; // request processed - response ready
struct timeval tv_timeout_last_checkpoint; // last checkpoint
} timings;
struct {
struct web_client *prev;
struct web_client *next;
} cache;
struct response response;
};
int web_client_permission_denied(struct web_client *w);
@ -227,8 +243,28 @@ char *strip_control_characters(char *url);
int web_client_socket_is_now_used_for_streaming(struct web_client *w);
void web_client_zero(struct web_client *w);
struct web_client *web_client_create(size_t *statistics_memory_accounting);
void web_client_free(struct web_client *w);
#ifdef ENABLE_HTTPS
void web_client_reuse_ssl(struct web_client *w);
#endif
#include "web/api/web_api_v1.h"
#include "web/api/web_api_v2.h"
#include "daemon/common.h"
void web_client_decode_path_and_query_string(struct web_client *w, const char *path_and_query_string);
int web_client_api_request(RRDHOST *host, struct web_client *w, char *url_path_fragment);
const char *web_content_type_to_string(HTTP_CONTENT_TYPE content_type);
void web_client_enable_deflate(struct web_client *w, int gzip);
int web_client_api_request_with_node_selection(RRDHOST *host, struct web_client *w, char *decoded_url_path);
void web_client_timeout_checkpoint_init(struct web_client *w);
void web_client_timeout_checkpoint_set(struct web_client *w, int timeout_ms);
usec_t web_client_timeout_checkpoint(struct web_client *w);
bool web_client_timeout_checkpoint_and_check(struct web_client *w, usec_t *usec_since_last_checkpoint);
usec_t web_client_timeout_checkpoint_response_ready(struct web_client *w, usec_t *usec_since_last_checkpoint);
#endif

View file

@ -6,77 +6,6 @@
// ----------------------------------------------------------------------------
// allocate and free web_clients
#ifdef ENABLE_HTTPS
static void web_client_reuse_ssl(struct web_client *w) {
if (netdata_ssl_srv_ctx) {
if (w->ssl.conn) {
SSL_SESSION *session = SSL_get_session(w->ssl.conn);
SSL *old = w->ssl.conn;
w->ssl.conn = SSL_new(netdata_ssl_srv_ctx);
if (session) {
#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_111
if (SSL_SESSION_is_resumable(session))
#endif
SSL_set_session(w->ssl.conn, session);
}
SSL_free(old);
}
}
}
#endif
static void web_client_zero(struct web_client *w) {
// zero everything about it - but keep the buffers
// remember the pointers to the buffers
BUFFER *b1 = w->response.data;
BUFFER *b2 = w->response.header;
BUFFER *b3 = w->response.header_output;
// empty the buffers
buffer_flush(b1);
buffer_flush(b2);
buffer_flush(b3);
freez(w->user_agent);
// zero everything
memset(w, 0, sizeof(struct web_client));
// restore the pointers of the buffers
w->response.data = b1;
w->response.header = b2;
w->response.header_output = b3;
}
static void web_client_free(struct web_client *w) {
buffer_free(w->response.header_output);
buffer_free(w->response.header);
buffer_free(w->response.data);
freez(w->user_agent);
#ifdef ENABLE_HTTPS
if ((!web_client_check_unix(w)) && (netdata_ssl_srv_ctx)) {
if (w->ssl.conn) {
SSL_free(w->ssl.conn);
w->ssl.conn = NULL;
}
}
#endif
freez(w);
__atomic_sub_fetch(&netdata_buffers_statistics.buffers_web, sizeof(struct web_client), __ATOMIC_RELAXED);
}
static struct web_client *web_client_alloc(void) {
struct web_client *w = callocz(1, sizeof(struct web_client));
__atomic_add_fetch(&netdata_buffers_statistics.buffers_web, sizeof(struct web_client), __ATOMIC_RELAXED);
w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE, &netdata_buffers_statistics.buffers_web);
w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_web);
w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_web);
return w;
}
// ----------------------------------------------------------------------------
// web clients caching
@ -87,194 +16,134 @@ static struct web_client *web_client_alloc(void) {
// The size of the cache is adaptive. It caches the structures of 2x
// the number of currently connected clients.
// Comments per server:
// SINGLE-THREADED : 1 cache is maintained
// MULTI-THREADED : 1 cache is maintained
// STATIC-THREADED : 1 cache for each thread of the web server
static struct clients_cache {
struct {
SPINLOCK spinlock;
struct web_client *head; // the structures of the currently connected clients
size_t count; // the count the currently connected clients
__thread struct clients_cache web_clients_cache = {
.pid = 0,
.used = NULL,
.used_count = 0,
.avail = NULL,
.avail_count = 0,
.allocated = 0,
.reused = 0
size_t allocated; // the number of allocations
size_t reused; // the number of re-uses
} used;
struct {
SPINLOCK spinlock;
struct web_client *head; // the cached structures, available for future clients
size_t count; // the number of cached structures
} avail;
} web_clients_cache = {
.used = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.head = NULL,
.count = 0,
.reused = 0,
.allocated = 0,
},
.avail = {
.spinlock = NETDATA_SPINLOCK_INITIALIZER,
.head = NULL,
.count = 0,
},
};
inline void web_client_cache_verify(int force) {
#ifdef NETDATA_INTERNAL_CHECKS
static __thread size_t count = 0;
count++;
if(unlikely(force || count > 1000)) {
count = 0;
struct web_client *w;
size_t used = 0, avail = 0;
for(w = web_clients_cache.used; w ; w = w->next) used++;
for(w = web_clients_cache.avail; w ; w = w->next) avail++;
info("web_client_cache has %zu (%zu) used and %zu (%zu) available clients, allocated %zu, reused %zu (hit %zu%%)."
, used, web_clients_cache.used_count
, avail, web_clients_cache.avail_count
, web_clients_cache.allocated
, web_clients_cache.reused
, (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
);
}
#else
if(unlikely(force)) {
info("web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)."
, web_clients_cache.used_count
, web_clients_cache.avail_count
, web_clients_cache.allocated
, web_clients_cache.reused
, (web_clients_cache.allocated + web_clients_cache.reused)?(web_clients_cache.reused * 100 / (web_clients_cache.allocated + web_clients_cache.reused)):0
);
}
#endif
}
// destroy the cache and free all the memory it uses
void web_client_cache_destroy(void) {
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
web_client_cache_verify(1);
#endif
netdata_thread_disable_cancelability();
internal_error(true, "web_client_cache has %zu used and %zu available clients, allocated %zu, reused %zu (hit %zu%%)."
, web_clients_cache.used.count
, web_clients_cache.avail.count
, web_clients_cache.used.allocated
, web_clients_cache.used.reused
, (web_clients_cache.used.allocated + web_clients_cache.used.reused)?(web_clients_cache.used.reused * 100 / (web_clients_cache.used.allocated + web_clients_cache.used.reused)):0
);
struct web_client *w, *t;
w = web_clients_cache.used;
netdata_spinlock_lock(&web_clients_cache.avail.spinlock);
w = web_clients_cache.avail.head;
while(w) {
t = w;
w = w->next;
w = w->cache.next;
web_client_free(t);
}
web_clients_cache.used = NULL;
web_clients_cache.used_count = 0;
web_clients_cache.avail.head = NULL;
web_clients_cache.avail.count = 0;
netdata_spinlock_unlock(&web_clients_cache.avail.spinlock);
w = web_clients_cache.avail;
while(w) {
t = w;
w = w->next;
web_client_free(t);
}
web_clients_cache.avail = NULL;
web_clients_cache.avail_count = 0;
netdata_thread_enable_cancelability();
// DO NOT FREE THEM IF THEY ARE USED
// netdata_spinlock_lock(&web_clients_cache.used.spinlock);
// w = web_clients_cache.used.head;
// while(w) {
// t = w;
// w = w->next;
// web_client_free(t);
// }
// web_clients_cache.used.head = NULL;
// web_clients_cache.used.count = 0;
// web_clients_cache.used.reused = 0;
// web_clients_cache.used.allocated = 0;
// netdata_spinlock_unlock(&web_clients_cache.used.spinlock);
}
struct web_client *web_client_get_from_cache_or_allocate() {
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(web_clients_cache.pid == 0))
web_clients_cache.pid = gettid();
if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
#endif
netdata_thread_disable_cancelability();
struct web_client *w = web_clients_cache.avail;
struct web_client *web_client_get_from_cache(void) {
netdata_spinlock_lock(&web_clients_cache.avail.spinlock);
struct web_client *w = web_clients_cache.avail.head;
if(w) {
// get it from avail
if (w == web_clients_cache.avail) web_clients_cache.avail = w->next;
if(w->prev) w->prev->next = w->next;
if(w->next) w->next->prev = w->prev;
web_clients_cache.avail_count--;
#ifdef ENABLE_HTTPS
web_client_reuse_ssl(w);
SSL *ssl = w->ssl.conn;
#endif
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(web_clients_cache.avail.head, w, cache.prev, cache.next);
web_clients_cache.avail.count--;
netdata_spinlock_unlock(&web_clients_cache.avail.spinlock);
web_client_zero(w);
web_clients_cache.reused++;
#ifdef ENABLE_HTTPS
w->ssl.conn = ssl;
w->ssl.flags = NETDATA_SSL_START;
debug(D_WEB_CLIENT_ACCESS,"Reusing SSL structure with (w->ssl = NULL, w->accepted = %u)", w->ssl.flags);
#endif
netdata_spinlock_lock(&web_clients_cache.used.spinlock);
web_clients_cache.used.reused++;
}
else {
netdata_spinlock_unlock(&web_clients_cache.avail.spinlock);
// allocate it
w = web_client_alloc();
w = web_client_create(&netdata_buffers_statistics.buffers_web);
#ifdef ENABLE_HTTPS
w->ssl.flags = NETDATA_SSL_START;
debug(D_WEB_CLIENT_ACCESS,"Starting SSL structure with (w->ssl = NULL, w->accepted = %u)", w->ssl.flags);
#endif
web_clients_cache.allocated++;
netdata_spinlock_lock(&web_clients_cache.used.spinlock);
web_clients_cache.used.allocated++;
}
// link it to used web clients
if (web_clients_cache.used) web_clients_cache.used->prev = w;
w->next = web_clients_cache.used;
w->prev = NULL;
web_clients_cache.used = w;
web_clients_cache.used_count++;
DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(web_clients_cache.used.head, w, cache.prev, cache.next);
web_clients_cache.used.count++;
netdata_spinlock_unlock(&web_clients_cache.used.spinlock);
// initialize it
w->use_count++;
w->id = global_statistics_web_client_connected();
w->mode = WEB_CLIENT_MODE_NORMAL;
netdata_thread_enable_cancelability();
w->mode = WEB_CLIENT_MODE_GET;
return w;
}
void web_client_release(struct web_client *w) {
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(web_clients_cache.pid != 0 && web_clients_cache.pid != gettid()))
error("Oops! wrong thread accessing the cache. Expected %d, found %d", (int)web_clients_cache.pid, (int)gettid());
if(unlikely(w->running))
error("%llu: releasing web client from %s port %s, but it still running.", w->id, w->client_ip, w->client_port);
#endif
debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port);
web_server_log_connection(w, "DISCONNECTED");
web_client_request_done(w);
global_statistics_web_client_disconnected();
netdata_thread_disable_cancelability();
if(web_server_mode != WEB_SERVER_MODE_STATIC_THREADED) {
if (w->ifd != -1) close(w->ifd);
if (w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
w->ifd = w->ofd = -1;
#ifdef ENABLE_HTTPS
web_client_reuse_ssl(w);
w->ssl.flags = NETDATA_SSL_START;
#endif
}
void web_client_release_to_cache(struct web_client *w) {
// unlink it from the used
if (w == web_clients_cache.used) web_clients_cache.used = w->next;
if(w->prev) w->prev->next = w->next;
if(w->next) w->next->prev = w->prev;
web_clients_cache.used_count--;
netdata_spinlock_lock(&web_clients_cache.used.spinlock);
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(web_clients_cache.used.head, w, cache.prev, cache.next);
ssize_t used_count = (ssize_t)--web_clients_cache.used.count;
netdata_spinlock_unlock(&web_clients_cache.used.spinlock);
netdata_spinlock_lock(&web_clients_cache.avail.spinlock);
if(w->use_count > 100 || (used_count > 0 && web_clients_cache.avail.count >= 2 * (size_t)used_count) || (used_count <= 10 && web_clients_cache.avail.count >= 20)) {
netdata_spinlock_unlock(&web_clients_cache.avail.spinlock);
if(web_clients_cache.avail_count >= 2 * web_clients_cache.used_count) {
// we have too many of them - free it
web_client_free(w);
}
else {
// link it to the avail
if (web_clients_cache.avail) web_clients_cache.avail->prev = w;
w->next = web_clients_cache.avail;
w->prev = NULL;
web_clients_cache.avail = w;
web_clients_cache.avail_count++;
DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(web_clients_cache.avail.head, w, cache.prev, cache.next);
web_clients_cache.avail.count++;
netdata_spinlock_unlock(&web_clients_cache.avail.spinlock);
}
netdata_thread_enable_cancelability();
}

View file

@ -6,25 +6,9 @@
#include "libnetdata/libnetdata.h"
#include "web_client.h"
struct clients_cache {
pid_t pid;
struct web_client *used; // the structures of the currently connected clients
size_t used_count; // the count the currently connected clients
struct web_client *avail; // the cached structures, available for future clients
size_t avail_count; // the number of cached structures
size_t reused; // the number of re-uses
size_t allocated; // the number of allocations
};
extern __thread struct clients_cache web_clients_cache;
void web_client_release(struct web_client *w);
struct web_client *web_client_get_from_cache_or_allocate();
void web_client_release_to_cache(struct web_client *w);
struct web_client *web_client_get_from_cache(void);
void web_client_cache_destroy(void);
void web_client_cache_verify(int force);
#include "web_server.h"

View file

@ -132,28 +132,3 @@ void web_client_update_acl_matches(struct web_client *w) {
void web_server_log_connection(struct web_client *w, const char *msg) {
log_access("%llu: %d '[%s]:%s' '%s'", w->id, gettid(), w->client_ip, w->client_port, msg);
}
// --------------------------------------------------------------------------------------
void web_client_initialize_connection(struct web_client *w) {
int flag = 1;
if(unlikely(web_client_check_tcp(w) && setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0))
debug(D_WEB_CLIENT, "%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd);
flag = 1;
if(unlikely(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0))
debug(D_WEB_CLIENT, "%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd);
web_client_update_acl_matches(w);
w->origin[0] = '*'; w->origin[1] = '\0';
w->cookie1[0] = '\0'; w->cookie2[0] = '\0';
freez(w->user_agent); w->user_agent = NULL;
web_client_enable_wait_receive(w);
web_server_log_connection(w, "CONNECTED");
web_client_cache_verify(0);
}

View file

@ -51,7 +51,6 @@ extern long web_client_streaming_rate_t;
extern LISTEN_SOCKETS api_sockets;
void web_client_update_acl_matches(struct web_client *w);
void web_server_log_connection(struct web_client *w, const char *msg);
void web_client_initialize_connection(struct web_client *w);
struct web_client *web_client_create_on_listenfd(int listener);
#include "web_client_cache.h"