mirror of
https://libwebsockets.org/repo/libwebsockets
synced 2024-11-21 16:47:52 +00:00
1201 lines
31 KiB
C
1201 lines
31 KiB
C
/*
|
|
* libwebsockets-test-server - libwebsockets test implementation
|
|
*
|
|
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
|
|
*
|
|
* This file is made available under the Creative Commons CC0 1.0
|
|
* Universal Public Domain Dedication.
|
|
*
|
|
* The person who associated a work with this deed has dedicated
|
|
* the work to the public domain by waiving all of his or her rights
|
|
* to the work worldwide under copyright law, including all related
|
|
* and neighboring rights, to the extent allowed by law. You can copy,
|
|
* modify, distribute and perform the work, even for commercial purposes,
|
|
* all without asking permission.
|
|
*
|
|
* The test apps are intended to be adapted for use in your code, which
|
|
* may be proprietary. So unlike the library itself, they are licensed
|
|
* Public Domain.
|
|
*
|
|
* Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus)
|
|
*
|
|
* https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00
|
|
*
|
|
* This plugin provides four protocols related to openmetrics handling:
|
|
*
|
|
* 1) "lws-openmetrics" direct http listener so scraper can directly get metrics
|
|
*
|
|
* 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect
|
|
* to locally to proxy through to connected remote clients at 3)
|
|
*
|
|
* 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can
|
|
* connect to, providing a path where scrapers at 2) can get metrics from
|
|
* clients connected us
|
|
*
|
|
* 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to
|
|
* keep up a connection to the server at 3), allowing to scraper to reach
|
|
* clients that have no reachable way to serve.
|
|
*
|
|
* These are provided like this to maximize flexibility in being able to add
|
|
* openmetrics serving, proxying, or client->proxy to existing lws code.
|
|
*
|
|
* Openmetrics supports a "metric" at the top of its report that describes the
|
|
* source aka "target metadata".
|
|
*
|
|
* Since we want to enable collection from devices that are not externally
|
|
* reachable, we must provide a reachable server that the clients can attach to
|
|
* and have their stats aggregated and then read by Prometheus or whatever.
|
|
* Openmetrics says that it wants to present the aggregated stats in a flat
|
|
* summary with only the aggregator's "target metadata" and contributor targets
|
|
* getting their data tagged with the source
|
|
*
|
|
* "The above discussion is in the context of individual exposers. An
|
|
* exposition from a general purpose monitoring system may contain
|
|
* metrics from many individual targets, and thus may expose multiple
|
|
* target info Metrics. The metrics may already have had target
|
|
* metadata added to them as labels as part of ingestion. The metric
|
|
* names MUST NOT be varied based on target metadata. For example it
|
|
* would be incorrect for all metrics to end up being prefixed with
|
|
* staging_ even if they all originated from targets in a staging
|
|
* environment)."
|
|
*/
|
|
|
|
#if !defined (LWS_PLUGIN_STATIC)
|
|
#if !defined(LWS_DLL)
|
|
#define LWS_DLL
|
|
#endif
|
|
#if !defined(LWS_INTERNAL)
|
|
#define LWS_INTERNAL
|
|
#endif
|
|
#include <libwebsockets.h>
|
|
#endif
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#if !defined(WIN32)
|
|
#include <unistd.h>
|
|
#endif
|
|
#include <assert.h>
|
|
|
|
struct vhd {
|
|
struct lws_context *cx;
|
|
struct lws_vhost *vhost;
|
|
|
|
char ws_server_uri[128];
|
|
char metrics_proxy_path[128];
|
|
char ba_secret[128];
|
|
|
|
const char *proxy_side_bind_name;
|
|
/**< name used to bind the two halves of the proxy together, must be
|
|
* the same name given in a pvo for both "lws-openmetrics-prox-agg"
|
|
* (the side local to the scraper) and "lws-openmetrics-prox-server"
|
|
* (the side the clients connect to)
|
|
*/
|
|
|
|
char sanity[8];
|
|
|
|
lws_dll2_owner_t clients;
|
|
|
|
lws_sorted_usec_list_t sul; /* schedule connection retry */
|
|
|
|
struct vhd *bind_partner_vhd;
|
|
|
|
struct lws *wsi; /* related wsi if any */
|
|
uint16_t retry_count; /* count of consequetive retries */
|
|
};
|
|
|
|
struct pss {
|
|
lws_dll2_t list;
|
|
char proxy_path[64];
|
|
struct lwsac *ac; /* the translated metrics, one ac per line */
|
|
struct lwsac *walk; /* iterator for ac when writing */
|
|
size_t tot; /* content-length computation */
|
|
struct lws *wsi;
|
|
|
|
uint8_t greet:1; /* set if client needs to send proxy path */
|
|
uint8_t trigger:1; /* we want to ask the client to dump */
|
|
};
|
|
|
|
#if defined(LWS_WITH_CLIENT)
|
|
static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
|
|
|
|
static const lws_retry_bo_t retry = {
|
|
.retry_ms_table = backoff_ms,
|
|
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
|
|
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
|
|
|
|
.secs_since_valid_ping = 400, /* force PINGs after secs idle */
|
|
.secs_since_valid_hangup = 400, /* hangup after secs idle */
|
|
|
|
.jitter_percent = 0,
|
|
};
|
|
|
|
static void
|
|
omc_connect_client(lws_sorted_usec_list_t *sul)
|
|
{
|
|
struct vhd *vhd = lws_container_of(sul, struct vhd, sul);
|
|
struct lws_client_connect_info i;
|
|
const char *prot;
|
|
char url[128];
|
|
|
|
memset(&i, 0, sizeof(i));
|
|
|
|
lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
|
|
|
|
lws_strncpy(url, vhd->ws_server_uri, sizeof(url));
|
|
|
|
if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) {
|
|
lwsl_err("%s: unable to parse uri %s\n", __func__,
|
|
vhd->ws_server_uri);
|
|
return;
|
|
}
|
|
|
|
i.context = vhd->cx;
|
|
i.origin = i.address;
|
|
i.host = i.address;
|
|
i.ssl_connection = LCCSCF_USE_SSL;
|
|
i.protocol = "lws-openmetrics-prox-server"; /* public subprot */
|
|
i.local_protocol_name = "lws-openmetrics-prox-client";
|
|
i.pwsi = &vhd->wsi;
|
|
i.retry_and_idle_policy = &retry;
|
|
i.userdata = vhd;
|
|
i.vhost = vhd->vhost;
|
|
|
|
lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path);
|
|
|
|
if (lws_client_connect_via_info(&i))
|
|
return;
|
|
|
|
/*
|
|
* Failed... schedule a retry... we can't use the _retry_wsi()
|
|
* convenience wrapper api here because no valid wsi at this
|
|
* point.
|
|
*/
|
|
if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
|
|
omc_connect_client, &vhd->retry_count))
|
|
return;
|
|
|
|
vhd->retry_count = 0;
|
|
lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
|
|
omc_connect_client, &vhd->retry_count);
|
|
}
|
|
#endif
|
|
|
|
static void
|
|
openmetrics_san(char *nm, size_t nl)
|
|
{
|
|
size_t m;
|
|
|
|
/* Openmetrics has a very restricted token charset */
|
|
|
|
for (m = 0; m < nl; m++)
|
|
if ((nm[m] < 'A' || nm[m] > 'Z') &&
|
|
(nm[m] < 'a' || nm[m] > 'z') &&
|
|
(nm[m] < '0' || nm[m] > '9') &&
|
|
nm[m] != '_')
|
|
nm[m] = '_';
|
|
}
|
|
|
|
static int
|
|
lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now,
|
|
int gng, char *buf, size_t len)
|
|
{
|
|
const char *_gng = gng ? "_nogo" : "_go";
|
|
char *end = buf + len - 1, *obuf = buf;
|
|
|
|
if (pub->flags & LWSMTFL_REPORT_ONLY_GO)
|
|
_gng = "";
|
|
|
|
if (!(pub->flags & LWSMTFL_REPORT_MEAN)) {
|
|
/* only the sum is meaningful */
|
|
if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) {
|
|
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
|
|
"%s_count %u\n"
|
|
"%s_us_sum %llu\n"
|
|
"%s_created %lu.%06u\n",
|
|
nm, (unsigned int)pub->u.agg.count[gng],
|
|
nm, (unsigned long long)pub->u.agg.sum[gng],
|
|
nm, (unsigned long)(pub->us_first / 1000000),
|
|
(unsigned int)(pub->us_first % 1000000));
|
|
|
|
return lws_ptr_diff(buf, obuf);
|
|
}
|
|
|
|
/* it's a monotonic ordinal, like total tx */
|
|
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
|
|
"%s%s_count %u\n"
|
|
"%s%s_sum %llu\n",
|
|
nm, _gng,
|
|
(unsigned int)pub->u.agg.count[gng],
|
|
nm, _gng,
|
|
(unsigned long long)pub->u.agg.sum[gng]);
|
|
|
|
} else
|
|
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
|
|
"%s%s_count %u\n"
|
|
"%s%s_mean %llu\n",
|
|
nm, _gng,
|
|
(unsigned int)pub->u.agg.count[gng],
|
|
nm, _gng, (unsigned long long)
|
|
(pub->u.agg.count[gng] ?
|
|
pub->u.agg.sum[gng] /
|
|
pub->u.agg.count[gng] : 0));
|
|
|
|
return lws_ptr_diff(buf, obuf);
|
|
}
|
|
|
|
static int
|
|
lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len)
|
|
{
|
|
char *q;
|
|
|
|
q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2);
|
|
if (!q) {
|
|
lwsac_free(&pss->ac);
|
|
|
|
return -1;
|
|
}
|
|
q[LWS_PRE] = (char)((len >> 8) & 0xff);
|
|
q[LWS_PRE + 1] = (char)(len & 0xff);
|
|
memcpy(q + LWS_PRE + 2, buf, len);
|
|
pss->tot += len;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* We have to do the ac listing at this level, because there can be too large
|
|
* a number to metrics tags to iterate that can fit in a reasonable buffer.
|
|
*/
|
|
|
|
static int
|
|
lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm)
|
|
{
|
|
char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512];
|
|
lws_usec_t t = lws_now_usecs();
|
|
|
|
if (pub->flags & LWSMTFL_REPORT_HIST) {
|
|
lws_metric_bucket_t *buck = pub->u.hist.head;
|
|
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"%s_count %llu\n",
|
|
nm, (unsigned long long)
|
|
pub->u.hist.total_count);
|
|
|
|
while (buck) {
|
|
lws_strncpy(tmp, lws_metric_bucket_name(buck),
|
|
sizeof(tmp));
|
|
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"%s{%s} %llu\n", nm, tmp,
|
|
(unsigned long long)buck->count);
|
|
|
|
lws_metrics_om_ac_stash(pss, buf,
|
|
lws_ptr_diff_size_t(p, buf));
|
|
p = buf;
|
|
|
|
buck = buck->next;
|
|
}
|
|
|
|
goto happy;
|
|
}
|
|
|
|
if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO])
|
|
return 0;
|
|
|
|
if (pub->u.agg.count[METRES_GO])
|
|
p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p,
|
|
lws_ptr_diff_size_t(end, p));
|
|
|
|
if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) &&
|
|
pub->u.agg.count[METRES_NOGO])
|
|
p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p,
|
|
lws_ptr_diff_size_t(end, p));
|
|
|
|
if (pub->flags & LWSMTFL_REPORT_MEAN)
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"%s_min %llu\n"
|
|
"%s_max %llu\n",
|
|
nm, (unsigned long long)pub->u.agg.min,
|
|
nm, (unsigned long long)pub->u.agg.max);
|
|
|
|
happy:
|
|
return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf));
|
|
}
|
|
|
|
static int
|
|
append_om_metric(lws_metric_pub_t *pub, void *user)
|
|
{
|
|
struct pss *pss = (struct pss *)user;
|
|
char nm[64];
|
|
size_t nl;
|
|
|
|
/*
|
|
* Convert lws_metrics to openmetrics metrics data, stashing into an
|
|
* lwsac without backfill. Since it's not backfilling, use areas are in
|
|
* linear sequence simplifying walking them. Limiting the lwsac alloc
|
|
* to less than a typical mtu means we can write one per write
|
|
* efficiently
|
|
*/
|
|
|
|
lws_strncpy(nm, pub->name, sizeof(nm));
|
|
nl = strlen(nm);
|
|
|
|
openmetrics_san(nm, nl);
|
|
|
|
return lws_metrics_om_format(pss, pub, nm);
|
|
}
|
|
|
|
#if defined(__linux__)
|
|
static int
|
|
grabfile(const char *fi, char *buf, size_t len)
|
|
{
|
|
int n, fd = lws_open(fi, LWS_O_RDONLY);
|
|
|
|
buf[0] = '\0';
|
|
if (fd < 0)
|
|
return -1;
|
|
|
|
n = (int)read(fd, buf, len - 1);
|
|
close(fd);
|
|
if (n < 0) {
|
|
buf[0] = '\0';
|
|
return -1;
|
|
}
|
|
|
|
buf[n] = '\0';
|
|
if (n > 0 && buf[n - 1] == '\n')
|
|
buf[--n] = '\0';
|
|
|
|
return n;
|
|
}
|
|
#endif
|
|
|
|
/*
|
|
* Let's pregenerate the output into an lwsac all at once and
|
|
* then spool it back to the peer afterwards
|
|
*
|
|
* - there's not going to be that much of it (a few kB)
|
|
* - we then know the content-length for the headers
|
|
* - it's stretchy to arbitrary numbers of metrics
|
|
* - lwsac block list provides the per-metric structure to
|
|
* hold the data in a way we can walk to write it simply
|
|
*/
|
|
|
|
int
|
|
ome_prepare(struct lws_context *ctx, struct pss *pss)
|
|
{
|
|
char buf[1224], *start = buf + LWS_PRE, *p = start,
|
|
*end = buf + sizeof(buf) - 1;
|
|
char hn[64];
|
|
|
|
pss->tot = 0;
|
|
|
|
/*
|
|
* Target metadata
|
|
*/
|
|
|
|
hn[0] = '\0';
|
|
gethostname(hn, sizeof(hn) - 1);
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"# TYPE target info\n"
|
|
"# HELP target Target metadata\n"
|
|
"target_info{hostname=\"%s\"", hn);
|
|
|
|
#if defined(__linux__)
|
|
if (grabfile("/proc/self/cmdline", hn, sizeof(hn)))
|
|
p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
|
|
",cmdline=\"%s\"", hn);
|
|
#endif
|
|
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n");
|
|
|
|
if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
|
|
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
|
|
return 1;
|
|
|
|
/* lws version */
|
|
|
|
p = start;
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"# TYPE lws_info info\n"
|
|
"# HELP lws_info Version of lws producing this\n"
|
|
"lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH);
|
|
if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
|
|
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
|
|
return 1;
|
|
|
|
/* system scalars */
|
|
|
|
#if defined(__linux__)
|
|
if (grabfile("/proc/loadavg", hn, sizeof(hn))) {
|
|
char *sp = strchr(hn, ' ');
|
|
if (sp) {
|
|
p = start;
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"load_1m %.*s\n",
|
|
lws_ptr_diff(sp, hn), hn);
|
|
if (lws_metrics_om_ac_stash(pss,
|
|
(char *)buf + LWS_PRE,
|
|
lws_ptr_diff_size_t(p,
|
|
start)))
|
|
return 1;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
if (lws_metrics_foreach(ctx, pss, append_om_metric))
|
|
return 1;
|
|
|
|
p = start;
|
|
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
|
|
"# EOF\n");
|
|
if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE,
|
|
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
|
|
return 1;
|
|
|
|
pss->walk = pss->ac;
|
|
|
|
return 0;
|
|
}
|
|
|
|
#if defined(LWS_WITH_SERVER)
|
|
|
|
/* 1) direct http export for scraper */
|
|
|
|
static int
|
|
callback_lws_openmetrics_export(struct lws *wsi,
|
|
enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len)
|
|
{
|
|
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
|
|
*end = buf + sizeof(buf) - 1, *ip;
|
|
struct lws_context *cx = lws_get_context(wsi);
|
|
struct pss *pss = (struct pss *)user;
|
|
unsigned int m, wm;
|
|
|
|
switch (reason) {
|
|
case LWS_CALLBACK_HTTP:
|
|
|
|
ome_prepare(cx, pss);
|
|
|
|
p = start;
|
|
if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
|
|
"application/openmetrics-text; "
|
|
"version=1.0.0; charset=utf-8",
|
|
pss->tot, &p, end) ||
|
|
lws_finalize_write_http_header(wsi, start, &p, end))
|
|
return 1;
|
|
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return 0;
|
|
|
|
case LWS_CALLBACK_CLOSED_HTTP:
|
|
lwsac_free(&pss->ac);
|
|
break;
|
|
|
|
case LWS_CALLBACK_HTTP_WRITEABLE:
|
|
if (!pss->walk)
|
|
return 0;
|
|
|
|
do {
|
|
ip = (uint8_t *)pss->walk +
|
|
lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
|
|
m = (unsigned int)((ip[0] << 8) | ip[1]);
|
|
|
|
/* coverity */
|
|
if (m > lwsac_get_tail_pos(pss->walk) -
|
|
lwsac_sizeof(pss->walk == pss->ac))
|
|
return -1;
|
|
|
|
if (lws_ptr_diff_size_t(end, p) < m)
|
|
break;
|
|
|
|
memcpy(p, ip + 2, m);
|
|
p += m;
|
|
|
|
pss->walk = lwsac_get_next(pss->walk);
|
|
} while (pss->walk);
|
|
|
|
if (!lws_ptr_diff_size_t(p, start)) {
|
|
lwsl_err("%s: stuck\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
|
|
|
|
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
|
|
(enum lws_write_protocol)wm) < 0)
|
|
return 1;
|
|
|
|
if (!pss->walk) {
|
|
if (lws_http_transaction_completed(wsi))
|
|
return -1;
|
|
} else
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return 0;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
|
}
|
|
|
|
static struct pss *
|
|
omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss)
|
|
{
|
|
/*
|
|
* Search through our partner's clients list looking for one with the
|
|
* same proxy path
|
|
*/
|
|
lws_start_foreach_dll(struct lws_dll2 *, d,
|
|
vhd->bind_partner_vhd->clients.head) {
|
|
struct pss *apss = lws_container_of(d, struct pss, list);
|
|
|
|
if (!strcmp(pss->proxy_path, apss->proxy_path))
|
|
return apss;
|
|
|
|
} lws_end_foreach_dll(d);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */
|
|
|
|
static int
|
|
callback_lws_openmetrics_prox_agg(struct lws *wsi,
|
|
enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len)
|
|
{
|
|
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
|
|
*end = buf + sizeof(buf) - 1, *ip;
|
|
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
|
|
lws_get_vhost(wsi), lws_get_protocol(wsi));
|
|
struct lws_context *cx = lws_get_context(wsi);
|
|
struct pss *pss = (struct pss *)user, *partner_pss;
|
|
unsigned int m, wm;
|
|
|
|
switch (reason) {
|
|
|
|
case LWS_CALLBACK_PROTOCOL_INIT:
|
|
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
|
|
/*
|
|
* We get told what to do when we are bound to the vhost
|
|
*/
|
|
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
|
|
lws_get_protocol(wsi), sizeof(struct vhd));
|
|
if (!vhd) {
|
|
lwsl_err("%s: vhd alloc failed\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
vhd->cx = cx;
|
|
|
|
/*
|
|
* Try to bind to the counterpart server in the proxy, binding
|
|
* to the right one by having a common bind name set in a pvo.
|
|
* We don't know who will get instantiated last, so both parts
|
|
* try to bind if not already bound
|
|
*/
|
|
|
|
if (!lws_pvo_get_str(in, "proxy-side-bind-name",
|
|
&vhd->proxy_side_bind_name)) {
|
|
/*
|
|
* Attempt to find the vhd that belongs to a vhost
|
|
* that has instantiated protocol
|
|
* "lws-openmetrics-prox-server", and has set pvo
|
|
* "proxy-side-bind-name" on it to whatever our
|
|
* vhd->proxy_side_bind_name was also set to.
|
|
*
|
|
* If found, inform the two sides of the same proxy
|
|
* what their partner vhd is
|
|
*/
|
|
lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity));
|
|
vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
|
|
"lws-openmetrics-prox-server",
|
|
"proxy-side-bind-name",
|
|
vhd->proxy_side_bind_name);
|
|
if (vhd->bind_partner_vhd) {
|
|
assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws"));
|
|
lwsl_notice("%s: proxy binding OK\n", __func__);
|
|
vhd->bind_partner_vhd->bind_partner_vhd = vhd;
|
|
}
|
|
} else {
|
|
lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
break;
|
|
|
|
case LWS_CALLBACK_PROTOCOL_DESTROY:
|
|
if (vhd)
|
|
lws_sul_cancel(&vhd->sul);
|
|
break;
|
|
|
|
case LWS_CALLBACK_HTTP:
|
|
|
|
/*
|
|
* The scraper has connected to us, the local side of the proxy,
|
|
* we need to match what it wants to
|
|
*/
|
|
|
|
if (!vhd->bind_partner_vhd)
|
|
return 0;
|
|
|
|
lws_strnncpy(pss->proxy_path, (const char *)in, len,
|
|
sizeof(pss->proxy_path));
|
|
|
|
if (pss->list.owner) {
|
|
lwsl_warn("%s: double HTTP?\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
pss->wsi = wsi;
|
|
|
|
lws_start_foreach_dll(struct lws_dll2 *, d,
|
|
vhd->bind_partner_vhd->clients.head) {
|
|
struct pss *apss = lws_container_of(d, struct pss, list);
|
|
|
|
if (!strcmp((const char *)in, apss->proxy_path)) {
|
|
apss->trigger = 1;
|
|
lws_callback_on_writable(apss->wsi);
|
|
|
|
/* let's add him on the http server vhd list */
|
|
|
|
lws_dll2_add_tail(&pss->list, &vhd->clients);
|
|
return 0;
|
|
}
|
|
|
|
} lws_end_foreach_dll(d);
|
|
|
|
return 0;
|
|
|
|
case LWS_CALLBACK_CLOSED_HTTP:
|
|
lwsac_free(&pss->ac);
|
|
lws_dll2_remove(&pss->list);
|
|
break;
|
|
|
|
case LWS_CALLBACK_HTTP_WRITEABLE:
|
|
|
|
if (!pss->walk)
|
|
return 0;
|
|
|
|
/* locate the wss side if it's still around */
|
|
|
|
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
|
|
if (!partner_pss)
|
|
return -1;
|
|
|
|
do {
|
|
ip = (uint8_t *)pss->walk +
|
|
lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE;
|
|
m = (unsigned int)((ip[0] << 8) | ip[1]);
|
|
|
|
/* coverity */
|
|
if (m > lwsac_get_tail_pos(pss->walk) -
|
|
lwsac_sizeof(pss->walk == partner_pss->ac))
|
|
return -1;
|
|
|
|
if (lws_ptr_diff_size_t(end, p) < m)
|
|
break;
|
|
|
|
memcpy(p, ip + 2, m);
|
|
p += m;
|
|
|
|
pss->walk = lwsac_get_next(pss->walk);
|
|
} while (pss->walk);
|
|
|
|
if (!lws_ptr_diff_size_t(p, start)) {
|
|
lwsl_err("%s: stuck\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
|
|
|
|
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
|
|
(enum lws_write_protocol)wm) < 0)
|
|
return 1;
|
|
|
|
if (!pss->walk) {
|
|
lwsl_info("%s: whole msg proxied to scraper\n", __func__);
|
|
lws_dll2_remove(&pss->list);
|
|
lwsac_free(&partner_pss->ac);
|
|
// if (lws_http_transaction_completed(wsi))
|
|
return -1;
|
|
} else
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return 0;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
|
}
|
|
|
|
/* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for
|
|
* ws clients to connect to */
|
|
|
|
static int
|
|
callback_lws_openmetrics_prox_server(struct lws *wsi,
|
|
enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len)
|
|
{
|
|
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
|
|
*end = buf + sizeof(buf) - 1;
|
|
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
|
|
lws_get_vhost(wsi), lws_get_protocol(wsi));
|
|
struct lws_context *cx = lws_get_context(wsi);
|
|
struct pss *pss = (struct pss *)user, *partner_pss;
|
|
|
|
switch (reason) {
|
|
|
|
case LWS_CALLBACK_PROTOCOL_INIT:
|
|
/*
|
|
* We get told what to do when we are bound to the vhost
|
|
*/
|
|
|
|
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
|
|
|
|
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
|
|
lws_get_protocol(wsi), sizeof(struct vhd));
|
|
if (!vhd) {
|
|
lwsl_err("%s: vhd alloc failed\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
vhd->cx = cx;
|
|
|
|
/*
|
|
* Try to bind to the counterpart server in the proxy, binding
|
|
* to the right one by having a common bind name set in a pvo.
|
|
* We don't know who will get instantiated last, so both parts
|
|
* try to bind if not already bound
|
|
*/
|
|
|
|
if (!lws_pvo_get_str(in, "proxy-side-bind-name",
|
|
&vhd->proxy_side_bind_name)) {
|
|
/*
|
|
* Attempt to find the vhd that belongs to a vhost
|
|
* that has instantiated protocol
|
|
* "lws-openmetrics-prox-server", and has set pvo
|
|
* "proxy-side-bind-name" on it to whatever our
|
|
* vhd->proxy_side_bind_name was also set to.
|
|
*
|
|
* If found, inform the two sides of the same proxy
|
|
* what their partner vhd is
|
|
*/
|
|
lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity));
|
|
vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
|
|
"lws-openmetrics-prox-agg",
|
|
"proxy-side-bind-name",
|
|
vhd->proxy_side_bind_name);
|
|
if (vhd->bind_partner_vhd) {
|
|
assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg"));
|
|
lwsl_notice("%s: proxy binding OK\n", __func__);
|
|
vhd->bind_partner_vhd->bind_partner_vhd = vhd;
|
|
}
|
|
} else {
|
|
lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
break;
|
|
|
|
case LWS_CALLBACK_PROTOCOL_DESTROY:
|
|
break;
|
|
|
|
case LWS_CALLBACK_ESTABLISHED:
|
|
/*
|
|
* a client has joined... we need to add his pss to our list
|
|
* of live, joined clients
|
|
*/
|
|
|
|
/* mark us as waiting for the reference name from the client */
|
|
pss->greet = 1;
|
|
pss->wsi = wsi;
|
|
lws_validity_confirmed(wsi);
|
|
|
|
return 0;
|
|
|
|
case LWS_CALLBACK_CLOSED:
|
|
/*
|
|
* a client has parted
|
|
*/
|
|
lws_dll2_remove(&pss->list);
|
|
lwsl_warn("%s: client %s left (%u)\n", __func__,
|
|
pss->proxy_path,
|
|
(unsigned int)vhd->clients.count);
|
|
lwsac_free(&pss->ac);
|
|
|
|
/* let's kill the scraper connection accordingly, if still up */
|
|
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
|
|
if (partner_pss)
|
|
lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC);
|
|
break;
|
|
|
|
case LWS_CALLBACK_RECEIVE:
|
|
if (pss->greet) {
|
|
pss->greet = 0;
|
|
lws_strnncpy(pss->proxy_path, (const char *)in, len,
|
|
sizeof(pss->proxy_path));
|
|
|
|
lws_validity_confirmed(wsi);
|
|
lwsl_notice("%s: received greet '%s'\n", __func__,
|
|
pss->proxy_path);
|
|
/*
|
|
* we need to add his pss to our list of configured,
|
|
* live, joined clients
|
|
*/
|
|
lws_dll2_add_tail(&pss->list, &vhd->clients);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* He's sending us his results... let's collect chunks into the
|
|
* pss lwsac before worrying about anything else
|
|
*/
|
|
|
|
if (lws_is_first_fragment(wsi))
|
|
pss->tot = 0;
|
|
|
|
lws_metrics_om_ac_stash(pss, (const char *)in, len);
|
|
|
|
if (lws_is_final_fragment(wsi)) {
|
|
struct pss *partner_pss;
|
|
|
|
lwsl_info("%s: ws side received complete msg\n",
|
|
__func__);
|
|
|
|
/* the lwsac is complete */
|
|
pss->walk = pss->ac;
|
|
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
|
|
if (!partner_pss) {
|
|
lwsl_notice("%s: no partner A\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
/* indicate to scraper side we want to issue now */
|
|
|
|
p = start;
|
|
if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK,
|
|
"application/openmetrics-text; "
|
|
"version=1.0.0; charset=utf-8",
|
|
pss->tot, &p, end) ||
|
|
lws_finalize_write_http_header(partner_pss->wsi,
|
|
start, &p, end))
|
|
return -1;
|
|
|
|
/* indicate to scraper side we want to issue now */
|
|
|
|
partner_pss->walk = pss->ac;
|
|
partner_pss->trigger = 1;
|
|
lws_callback_on_writable(partner_pss->wsi);
|
|
}
|
|
|
|
return 0;
|
|
|
|
case LWS_CALLBACK_SERVER_WRITEABLE:
|
|
if (!pss->trigger)
|
|
return 0;
|
|
|
|
pss->trigger = 0;
|
|
|
|
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
|
|
if (!partner_pss) {
|
|
lwsl_err("%s: no partner\n", __func__);
|
|
return 0;
|
|
}
|
|
|
|
lwsl_info("%s: sending trigger to client\n", __func__);
|
|
|
|
*start = 'x';
|
|
if (lws_write(wsi, start, 1,
|
|
(enum lws_write_protocol)LWS_WRITE_TEXT) < 0)
|
|
return 1;
|
|
|
|
lws_validity_confirmed(wsi);
|
|
|
|
return 0;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
|
}
|
|
#endif
|
|
|
|
#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
|
|
|
|
/* 4) ws client that keeps wss connection up to metrics proxy ws server */
|
|
|
|
static int
|
|
callback_lws_openmetrics_prox_client(struct lws *wsi,
|
|
enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len)
|
|
{
|
|
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
|
|
*end = buf + sizeof(buf) - 1, *ip;
|
|
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
|
|
lws_get_vhost(wsi), lws_get_protocol(wsi));
|
|
struct lws_context *cx = lws_get_context(wsi);
|
|
struct pss *pss = (struct pss *)user;
|
|
unsigned int m, wm;
|
|
const char *cp;
|
|
char first;
|
|
|
|
switch (reason) {
|
|
|
|
case LWS_CALLBACK_PROTOCOL_INIT:
|
|
|
|
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__,
|
|
lws_vh_tag(lws_get_vhost(wsi)));
|
|
|
|
|
|
/*
|
|
* We get told what to do when we are bound to the vhost
|
|
*/
|
|
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
|
|
lws_get_protocol(wsi), sizeof(struct vhd));
|
|
if (!vhd)
|
|
return 0;
|
|
|
|
vhd->cx = cx;
|
|
vhd->vhost = lws_get_vhost(wsi);
|
|
|
|
/* the proxy server uri */
|
|
|
|
if (lws_pvo_get_str(in, "ws-server-uri", &cp) || !cp) {
|
|
lwsl_warn("%s: ws-server-uri pvo required\n", __func__);
|
|
|
|
return 0;
|
|
}
|
|
lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri));
|
|
|
|
/* how we should be referenced at the proxy */
|
|
|
|
if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) {
|
|
lwsl_err("%s: metrics-proxy-path pvo required\n", __func__);
|
|
|
|
return 1;
|
|
}
|
|
lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path));
|
|
|
|
/* the shared secret to authenticate us as allowed to join */
|
|
|
|
if (lws_pvo_get_str(in, "ba-secret", &cp)) {
|
|
lwsl_err("%s: ba-secret pvo required\n", __func__);
|
|
|
|
return 1;
|
|
}
|
|
lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret));
|
|
|
|
lwsl_notice("%s: scheduling connect %s %s %s\n", __func__,
|
|
vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
|
|
|
|
lws_validity_confirmed(wsi);
|
|
lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1);
|
|
break;
|
|
|
|
case LWS_CALLBACK_PROTOCOL_DESTROY:
|
|
if (vhd)
|
|
lws_sul_cancel(&vhd->sul);
|
|
break;
|
|
|
|
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
|
|
{
|
|
unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len;
|
|
char b[128];
|
|
|
|
/* authorize ourselves to the metrics proxy using basic auth */
|
|
|
|
if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret,
|
|
b, sizeof(b)))
|
|
break;
|
|
|
|
if (lws_add_http_header_by_token(wsi,
|
|
WSI_TOKEN_HTTP_AUTHORIZATION,
|
|
(unsigned char *)b,
|
|
(int)strlen(b), pp, pend))
|
|
return -1;
|
|
|
|
break;
|
|
}
|
|
|
|
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
|
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
|
|
in ? (char *)in : "(null)");
|
|
goto do_retry;
|
|
|
|
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
|
lwsl_warn("%s: connected to ws metrics agg server\n", __func__);
|
|
pss->greet = 1;
|
|
lws_callback_on_writable(wsi);
|
|
lws_validity_confirmed(wsi);
|
|
return 0;
|
|
|
|
case LWS_CALLBACK_CLIENT_CLOSED:
|
|
lwsl_notice("%s: client closed\n", __func__);
|
|
lwsac_free(&pss->ac);
|
|
goto do_retry;
|
|
|
|
case LWS_CALLBACK_CLIENT_RECEIVE:
|
|
/*
|
|
* Proxy serverside sends us something to trigger us to create
|
|
* our metrics message and send it back over the ws link
|
|
*/
|
|
ome_prepare(cx, pss);
|
|
pss->walk = pss->ac;
|
|
lws_callback_on_writable(wsi);
|
|
lwsl_info("%s: dump requested\n", __func__);
|
|
break;
|
|
|
|
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
|
if (pss->greet) {
|
|
/*
|
|
* At first after establishing the we link, we send a
|
|
* message indicating to the metrics proxy how we
|
|
* should be referred to by the scraper to particularly
|
|
* select to talk to us
|
|
*/
|
|
lwsl_info("%s: sending greet '%s'\n", __func__,
|
|
vhd->metrics_proxy_path);
|
|
lws_strncpy((char *)start, vhd->metrics_proxy_path,
|
|
sizeof(buf) - LWS_PRE);
|
|
if (lws_write(wsi, start,
|
|
strlen(vhd->metrics_proxy_path),
|
|
LWS_WRITE_TEXT) < 0)
|
|
return 1;
|
|
|
|
lws_validity_confirmed(wsi);
|
|
|
|
pss->greet = 0;
|
|
return 0;
|
|
}
|
|
|
|
if (!pss->walk)
|
|
return 0;
|
|
|
|
/*
|
|
* We send the metrics dump in a single logical ws message,
|
|
* using ws fragmentation to split it around 1 mtu boundary
|
|
* and keep coming back until it's finished
|
|
*/
|
|
|
|
first = pss->walk == pss->ac;
|
|
|
|
do {
|
|
ip = (uint8_t *)pss->walk +
|
|
lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
|
|
m = (unsigned int)((ip[0] << 8) | ip[1]);
|
|
|
|
/* coverity */
|
|
if (m > lwsac_get_tail_pos(pss->walk) -
|
|
lwsac_sizeof(pss->walk == pss->ac)) {
|
|
lwsl_err("%s: size blow\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
if (lws_ptr_diff_size_t(end, p) < m)
|
|
break;
|
|
|
|
memcpy(p, ip + 2, m);
|
|
p += m;
|
|
|
|
pss->walk = lwsac_get_next(pss->walk);
|
|
} while (pss->walk);
|
|
|
|
if (!lws_ptr_diff_size_t(p, start)) {
|
|
lwsl_err("%s: stuck\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first,
|
|
!pss->walk);
|
|
|
|
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
|
|
(enum lws_write_protocol)wm) < 0) {
|
|
lwsl_notice("%s: write fail\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
lws_validity_confirmed(wsi);
|
|
lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start));
|
|
|
|
if (!pss->walk) {
|
|
lwsl_info("%s: dump send completed\n", __func__);
|
|
lwsac_free(&pss->ac);
|
|
} else
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return 0;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
|
|
|
do_retry:
|
|
if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
|
|
omc_connect_client, &vhd->retry_count))
|
|
return 0;
|
|
|
|
vhd->retry_count = 0;
|
|
lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
|
|
omc_connect_client, &vhd->retry_count);
|
|
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
|
|
LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = {
|
|
#if defined(LWS_WITH_SERVER)
|
|
{ /* for scraper directly: http export on listen socket */
|
|
"lws-openmetrics",
|
|
callback_lws_openmetrics_export,
|
|
sizeof(struct pss),
|
|
1024, 0, NULL, 0
|
|
},
|
|
{ /* for scraper via ws proxy: http export on listen socket */
|
|
"lws-openmetrics-prox-agg",
|
|
callback_lws_openmetrics_prox_agg,
|
|
sizeof(struct pss),
|
|
1024, 0, NULL, 0
|
|
},
|
|
{ /* metrics proxy server side: ws server for clients to connect to */
|
|
"lws-openmetrics-prox-server",
|
|
callback_lws_openmetrics_prox_server,
|
|
sizeof(struct pss),
|
|
1024, 0, NULL, 0
|
|
},
|
|
#endif
|
|
#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
|
|
{ /* client to metrics proxy: ws client to connect to metrics proxy*/
|
|
"lws-openmetrics-prox-client",
|
|
callback_lws_openmetrics_prox_client,
|
|
sizeof(struct pss),
|
|
1024, 0, NULL, 0
|
|
},
|
|
#endif
|
|
};
|
|
|
|
LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = {
|
|
.hdr = {
|
|
"lws OpenMetrics export",
|
|
"lws_protocol_plugin",
|
|
LWS_BUILD_HASH,
|
|
LWS_PLUGIN_API_MAGIC
|
|
},
|
|
|
|
.protocols = lws_openmetrics_export_protocols,
|
|
.count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols),
|
|
};
|