libwebsockets/plugins/protocol_lws_openmetrics_export.c

1201 lines
31 KiB
C

/*
* libwebsockets-test-server - libwebsockets test implementation
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* The person who associated a work with this deed has dedicated
* the work to the public domain by waiving all of his or her rights
* to the work worldwide under copyright law, including all related
* and neighboring rights, to the extent allowed by law. You can copy,
* modify, distribute and perform the work, even for commercial purposes,
* all without asking permission.
*
* The test apps are intended to be adapted for use in your code, which
* may be proprietary. So unlike the library itself, they are licensed
* Public Domain.
*
* Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus)
*
* https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00
*
* This plugin provides four protocols related to openmetrics handling:
*
* 1) "lws-openmetrics" direct http listener so scraper can directly get metrics
*
* 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect
* to locally to proxy through to connected remote clients at 3)
*
* 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can
* connect to, providing a path where scrapers at 2) can get metrics from
* clients connected us
*
* 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to
* keep up a connection to the server at 3), allowing to scraper to reach
* clients that have no reachable way to serve.
*
* These are provided like this to maximize flexibility in being able to add
* openmetrics serving, proxying, or client->proxy to existing lws code.
*
* Openmetrics supports a "metric" at the top of its report that describes the
* source aka "target metadata".
*
* Since we want to enable collection from devices that are not externally
* reachable, we must provide a reachable server that the clients can attach to
* and have their stats aggregated and then read by Prometheus or whatever.
* Openmetrics says that it wants to present the aggregated stats in a flat
* summary with only the aggregator's "target metadata" and contributor targets
* getting their data tagged with the source
*
* "The above discussion is in the context of individual exposers. An
* exposition from a general purpose monitoring system may contain
* metrics from many individual targets, and thus may expose multiple
* target info Metrics. The metrics may already have had target
* metadata added to them as labels as part of ingestion. The metric
* names MUST NOT be varied based on target metadata. For example it
* would be incorrect for all metrics to end up being prefixed with
* staging_ even if they all originated from targets in a staging
* environment)."
*/
#if !defined (LWS_PLUGIN_STATIC)
#if !defined(LWS_DLL)
#define LWS_DLL
#endif
#if !defined(LWS_INTERNAL)
#define LWS_INTERNAL
#endif
#include <libwebsockets.h>
#endif
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#if !defined(WIN32)
#include <unistd.h>
#endif
#include <assert.h>
struct vhd {
struct lws_context *cx;
struct lws_vhost *vhost;
char ws_server_uri[128];
char metrics_proxy_path[128];
char ba_secret[128];
const char *proxy_side_bind_name;
/**< name used to bind the two halves of the proxy together, must be
* the same name given in a pvo for both "lws-openmetrics-prox-agg"
* (the side local to the scraper) and "lws-openmetrics-prox-server"
* (the side the clients connect to)
*/
char sanity[8];
lws_dll2_owner_t clients;
lws_sorted_usec_list_t sul; /* schedule connection retry */
struct vhd *bind_partner_vhd;
struct lws *wsi; /* related wsi if any */
uint16_t retry_count; /* count of consequetive retries */
};
struct pss {
lws_dll2_t list;
char proxy_path[64];
struct lwsac *ac; /* the translated metrics, one ac per line */
struct lwsac *walk; /* iterator for ac when writing */
size_t tot; /* content-length computation */
struct lws *wsi;
uint8_t greet:1; /* set if client needs to send proxy path */
uint8_t trigger:1; /* we want to ask the client to dump */
};
#if defined(LWS_WITH_CLIENT)
static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
static const lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.secs_since_valid_ping = 400, /* force PINGs after secs idle */
.secs_since_valid_hangup = 400, /* hangup after secs idle */
.jitter_percent = 0,
};
static void
omc_connect_client(lws_sorted_usec_list_t *sul)
{
struct vhd *vhd = lws_container_of(sul, struct vhd, sul);
struct lws_client_connect_info i;
const char *prot;
char url[128];
memset(&i, 0, sizeof(i));
lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
lws_strncpy(url, vhd->ws_server_uri, sizeof(url));
if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) {
lwsl_err("%s: unable to parse uri %s\n", __func__,
vhd->ws_server_uri);
return;
}
i.context = vhd->cx;
i.origin = i.address;
i.host = i.address;
i.ssl_connection = LCCSCF_USE_SSL;
i.protocol = "lws-openmetrics-prox-server"; /* public subprot */
i.local_protocol_name = "lws-openmetrics-prox-client";
i.pwsi = &vhd->wsi;
i.retry_and_idle_policy = &retry;
i.userdata = vhd;
i.vhost = vhd->vhost;
lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path);
if (lws_client_connect_via_info(&i))
return;
/*
* Failed... schedule a retry... we can't use the _retry_wsi()
* convenience wrapper api here because no valid wsi at this
* point.
*/
if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
omc_connect_client, &vhd->retry_count))
return;
vhd->retry_count = 0;
lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
omc_connect_client, &vhd->retry_count);
}
#endif
static void
openmetrics_san(char *nm, size_t nl)
{
size_t m;
/* Openmetrics has a very restricted token charset */
for (m = 0; m < nl; m++)
if ((nm[m] < 'A' || nm[m] > 'Z') &&
(nm[m] < 'a' || nm[m] > 'z') &&
(nm[m] < '0' || nm[m] > '9') &&
nm[m] != '_')
nm[m] = '_';
}
static int
lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now,
int gng, char *buf, size_t len)
{
const char *_gng = gng ? "_nogo" : "_go";
char *end = buf + len - 1, *obuf = buf;
if (pub->flags & LWSMTFL_REPORT_ONLY_GO)
_gng = "";
if (!(pub->flags & LWSMTFL_REPORT_MEAN)) {
/* only the sum is meaningful */
if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) {
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
"%s_count %u\n"
"%s_us_sum %llu\n"
"%s_created %lu.%06u\n",
nm, (unsigned int)pub->u.agg.count[gng],
nm, (unsigned long long)pub->u.agg.sum[gng],
nm, (unsigned long)(pub->us_first / 1000000),
(unsigned int)(pub->us_first % 1000000));
return lws_ptr_diff(buf, obuf);
}
/* it's a monotonic ordinal, like total tx */
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
"%s%s_count %u\n"
"%s%s_sum %llu\n",
nm, _gng,
(unsigned int)pub->u.agg.count[gng],
nm, _gng,
(unsigned long long)pub->u.agg.sum[gng]);
} else
buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
"%s%s_count %u\n"
"%s%s_mean %llu\n",
nm, _gng,
(unsigned int)pub->u.agg.count[gng],
nm, _gng, (unsigned long long)
(pub->u.agg.count[gng] ?
pub->u.agg.sum[gng] /
pub->u.agg.count[gng] : 0));
return lws_ptr_diff(buf, obuf);
}
static int
lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len)
{
char *q;
q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2);
if (!q) {
lwsac_free(&pss->ac);
return -1;
}
q[LWS_PRE] = (char)((len >> 8) & 0xff);
q[LWS_PRE + 1] = (char)(len & 0xff);
memcpy(q + LWS_PRE + 2, buf, len);
pss->tot += len;
return 0;
}
/*
* We have to do the ac listing at this level, because there can be too large
* a number to metrics tags to iterate that can fit in a reasonable buffer.
*/
static int
lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm)
{
char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512];
lws_usec_t t = lws_now_usecs();
if (pub->flags & LWSMTFL_REPORT_HIST) {
lws_metric_bucket_t *buck = pub->u.hist.head;
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"%s_count %llu\n",
nm, (unsigned long long)
pub->u.hist.total_count);
while (buck) {
lws_strncpy(tmp, lws_metric_bucket_name(buck),
sizeof(tmp));
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"%s{%s} %llu\n", nm, tmp,
(unsigned long long)buck->count);
lws_metrics_om_ac_stash(pss, buf,
lws_ptr_diff_size_t(p, buf));
p = buf;
buck = buck->next;
}
goto happy;
}
if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO])
return 0;
if (pub->u.agg.count[METRES_GO])
p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p,
lws_ptr_diff_size_t(end, p));
if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) &&
pub->u.agg.count[METRES_NOGO])
p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p,
lws_ptr_diff_size_t(end, p));
if (pub->flags & LWSMTFL_REPORT_MEAN)
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"%s_min %llu\n"
"%s_max %llu\n",
nm, (unsigned long long)pub->u.agg.min,
nm, (unsigned long long)pub->u.agg.max);
happy:
return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf));
}
static int
append_om_metric(lws_metric_pub_t *pub, void *user)
{
struct pss *pss = (struct pss *)user;
char nm[64];
size_t nl;
/*
* Convert lws_metrics to openmetrics metrics data, stashing into an
* lwsac without backfill. Since it's not backfilling, use areas are in
* linear sequence simplifying walking them. Limiting the lwsac alloc
* to less than a typical mtu means we can write one per write
* efficiently
*/
lws_strncpy(nm, pub->name, sizeof(nm));
nl = strlen(nm);
openmetrics_san(nm, nl);
return lws_metrics_om_format(pss, pub, nm);
}
#if defined(__linux__)
static int
grabfile(const char *fi, char *buf, size_t len)
{
int n, fd = lws_open(fi, LWS_O_RDONLY);
buf[0] = '\0';
if (fd < 0)
return -1;
n = (int)read(fd, buf, len - 1);
close(fd);
if (n < 0) {
buf[0] = '\0';
return -1;
}
buf[n] = '\0';
if (n > 0 && buf[n - 1] == '\n')
buf[--n] = '\0';
return n;
}
#endif
/*
* Let's pregenerate the output into an lwsac all at once and
* then spool it back to the peer afterwards
*
* - there's not going to be that much of it (a few kB)
* - we then know the content-length for the headers
* - it's stretchy to arbitrary numbers of metrics
* - lwsac block list provides the per-metric structure to
* hold the data in a way we can walk to write it simply
*/
int
ome_prepare(struct lws_context *ctx, struct pss *pss)
{
char buf[1224], *start = buf + LWS_PRE, *p = start,
*end = buf + sizeof(buf) - 1;
char hn[64];
pss->tot = 0;
/*
* Target metadata
*/
hn[0] = '\0';
gethostname(hn, sizeof(hn) - 1);
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"# TYPE target info\n"
"# HELP target Target metadata\n"
"target_info{hostname=\"%s\"", hn);
#if defined(__linux__)
if (grabfile("/proc/self/cmdline", hn, sizeof(hn)))
p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
",cmdline=\"%s\"", hn);
#endif
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n");
if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
return 1;
/* lws version */
p = start;
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"# TYPE lws_info info\n"
"# HELP lws_info Version of lws producing this\n"
"lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH);
if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
return 1;
/* system scalars */
#if defined(__linux__)
if (grabfile("/proc/loadavg", hn, sizeof(hn))) {
char *sp = strchr(hn, ' ');
if (sp) {
p = start;
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"load_1m %.*s\n",
lws_ptr_diff(sp, hn), hn);
if (lws_metrics_om_ac_stash(pss,
(char *)buf + LWS_PRE,
lws_ptr_diff_size_t(p,
start)))
return 1;
}
}
#endif
if (lws_metrics_foreach(ctx, pss, append_om_metric))
return 1;
p = start;
p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
"# EOF\n");
if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE,
lws_ptr_diff_size_t(p, buf + LWS_PRE)))
return 1;
pss->walk = pss->ac;
return 0;
}
#if defined(LWS_WITH_SERVER)
/* 1) direct http export for scraper */
static int
callback_lws_openmetrics_export(struct lws *wsi,
enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
*end = buf + sizeof(buf) - 1, *ip;
struct lws_context *cx = lws_get_context(wsi);
struct pss *pss = (struct pss *)user;
unsigned int m, wm;
switch (reason) {
case LWS_CALLBACK_HTTP:
ome_prepare(cx, pss);
p = start;
if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
"application/openmetrics-text; "
"version=1.0.0; charset=utf-8",
pss->tot, &p, end) ||
lws_finalize_write_http_header(wsi, start, &p, end))
return 1;
lws_callback_on_writable(wsi);
return 0;
case LWS_CALLBACK_CLOSED_HTTP:
lwsac_free(&pss->ac);
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
if (!pss->walk)
return 0;
do {
ip = (uint8_t *)pss->walk +
lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
m = (unsigned int)((ip[0] << 8) | ip[1]);
/* coverity */
if (m > lwsac_get_tail_pos(pss->walk) -
lwsac_sizeof(pss->walk == pss->ac))
return -1;
if (lws_ptr_diff_size_t(end, p) < m)
break;
memcpy(p, ip + 2, m);
p += m;
pss->walk = lwsac_get_next(pss->walk);
} while (pss->walk);
if (!lws_ptr_diff_size_t(p, start)) {
lwsl_err("%s: stuck\n", __func__);
return -1;
}
wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
(enum lws_write_protocol)wm) < 0)
return 1;
if (!pss->walk) {
if (lws_http_transaction_completed(wsi))
return -1;
} else
lws_callback_on_writable(wsi);
return 0;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static struct pss *
omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss)
{
/*
* Search through our partner's clients list looking for one with the
* same proxy path
*/
lws_start_foreach_dll(struct lws_dll2 *, d,
vhd->bind_partner_vhd->clients.head) {
struct pss *apss = lws_container_of(d, struct pss, list);
if (!strcmp(pss->proxy_path, apss->proxy_path))
return apss;
} lws_end_foreach_dll(d);
return NULL;
}
/* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */
static int
callback_lws_openmetrics_prox_agg(struct lws *wsi,
enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
*end = buf + sizeof(buf) - 1, *ip;
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
lws_get_vhost(wsi), lws_get_protocol(wsi));
struct lws_context *cx = lws_get_context(wsi);
struct pss *pss = (struct pss *)user, *partner_pss;
unsigned int m, wm;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
/*
* We get told what to do when we are bound to the vhost
*/
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi), sizeof(struct vhd));
if (!vhd) {
lwsl_err("%s: vhd alloc failed\n", __func__);
return 0;
}
vhd->cx = cx;
/*
* Try to bind to the counterpart server in the proxy, binding
* to the right one by having a common bind name set in a pvo.
* We don't know who will get instantiated last, so both parts
* try to bind if not already bound
*/
if (!lws_pvo_get_str(in, "proxy-side-bind-name",
&vhd->proxy_side_bind_name)) {
/*
* Attempt to find the vhd that belongs to a vhost
* that has instantiated protocol
* "lws-openmetrics-prox-server", and has set pvo
* "proxy-side-bind-name" on it to whatever our
* vhd->proxy_side_bind_name was also set to.
*
* If found, inform the two sides of the same proxy
* what their partner vhd is
*/
lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity));
vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
"lws-openmetrics-prox-server",
"proxy-side-bind-name",
vhd->proxy_side_bind_name);
if (vhd->bind_partner_vhd) {
assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws"));
lwsl_notice("%s: proxy binding OK\n", __func__);
vhd->bind_partner_vhd->bind_partner_vhd = vhd;
}
} else {
lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
return 0;
}
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
if (vhd)
lws_sul_cancel(&vhd->sul);
break;
case LWS_CALLBACK_HTTP:
/*
* The scraper has connected to us, the local side of the proxy,
* we need to match what it wants to
*/
if (!vhd->bind_partner_vhd)
return 0;
lws_strnncpy(pss->proxy_path, (const char *)in, len,
sizeof(pss->proxy_path));
if (pss->list.owner) {
lwsl_warn("%s: double HTTP?\n", __func__);
return 0;
}
pss->wsi = wsi;
lws_start_foreach_dll(struct lws_dll2 *, d,
vhd->bind_partner_vhd->clients.head) {
struct pss *apss = lws_container_of(d, struct pss, list);
if (!strcmp((const char *)in, apss->proxy_path)) {
apss->trigger = 1;
lws_callback_on_writable(apss->wsi);
/* let's add him on the http server vhd list */
lws_dll2_add_tail(&pss->list, &vhd->clients);
return 0;
}
} lws_end_foreach_dll(d);
return 0;
case LWS_CALLBACK_CLOSED_HTTP:
lwsac_free(&pss->ac);
lws_dll2_remove(&pss->list);
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
if (!pss->walk)
return 0;
/* locate the wss side if it's still around */
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
if (!partner_pss)
return -1;
do {
ip = (uint8_t *)pss->walk +
lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE;
m = (unsigned int)((ip[0] << 8) | ip[1]);
/* coverity */
if (m > lwsac_get_tail_pos(pss->walk) -
lwsac_sizeof(pss->walk == partner_pss->ac))
return -1;
if (lws_ptr_diff_size_t(end, p) < m)
break;
memcpy(p, ip + 2, m);
p += m;
pss->walk = lwsac_get_next(pss->walk);
} while (pss->walk);
if (!lws_ptr_diff_size_t(p, start)) {
lwsl_err("%s: stuck\n", __func__);
return -1;
}
wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
(enum lws_write_protocol)wm) < 0)
return 1;
if (!pss->walk) {
lwsl_info("%s: whole msg proxied to scraper\n", __func__);
lws_dll2_remove(&pss->list);
lwsac_free(&partner_pss->ac);
// if (lws_http_transaction_completed(wsi))
return -1;
} else
lws_callback_on_writable(wsi);
return 0;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
/* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for
* ws clients to connect to */
static int
callback_lws_openmetrics_prox_server(struct lws *wsi,
enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
*end = buf + sizeof(buf) - 1;
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
lws_get_vhost(wsi), lws_get_protocol(wsi));
struct lws_context *cx = lws_get_context(wsi);
struct pss *pss = (struct pss *)user, *partner_pss;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
/*
* We get told what to do when we are bound to the vhost
*/
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi), sizeof(struct vhd));
if (!vhd) {
lwsl_err("%s: vhd alloc failed\n", __func__);
return 0;
}
vhd->cx = cx;
/*
* Try to bind to the counterpart server in the proxy, binding
* to the right one by having a common bind name set in a pvo.
* We don't know who will get instantiated last, so both parts
* try to bind if not already bound
*/
if (!lws_pvo_get_str(in, "proxy-side-bind-name",
&vhd->proxy_side_bind_name)) {
/*
* Attempt to find the vhd that belongs to a vhost
* that has instantiated protocol
* "lws-openmetrics-prox-server", and has set pvo
* "proxy-side-bind-name" on it to whatever our
* vhd->proxy_side_bind_name was also set to.
*
* If found, inform the two sides of the same proxy
* what their partner vhd is
*/
lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity));
vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
"lws-openmetrics-prox-agg",
"proxy-side-bind-name",
vhd->proxy_side_bind_name);
if (vhd->bind_partner_vhd) {
assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg"));
lwsl_notice("%s: proxy binding OK\n", __func__);
vhd->bind_partner_vhd->bind_partner_vhd = vhd;
}
} else {
lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
return 0;
}
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
break;
case LWS_CALLBACK_ESTABLISHED:
/*
* a client has joined... we need to add his pss to our list
* of live, joined clients
*/
/* mark us as waiting for the reference name from the client */
pss->greet = 1;
pss->wsi = wsi;
lws_validity_confirmed(wsi);
return 0;
case LWS_CALLBACK_CLOSED:
/*
* a client has parted
*/
lws_dll2_remove(&pss->list);
lwsl_warn("%s: client %s left (%u)\n", __func__,
pss->proxy_path,
(unsigned int)vhd->clients.count);
lwsac_free(&pss->ac);
/* let's kill the scraper connection accordingly, if still up */
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
if (partner_pss)
lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC);
break;
case LWS_CALLBACK_RECEIVE:
if (pss->greet) {
pss->greet = 0;
lws_strnncpy(pss->proxy_path, (const char *)in, len,
sizeof(pss->proxy_path));
lws_validity_confirmed(wsi);
lwsl_notice("%s: received greet '%s'\n", __func__,
pss->proxy_path);
/*
* we need to add his pss to our list of configured,
* live, joined clients
*/
lws_dll2_add_tail(&pss->list, &vhd->clients);
return 0;
}
/*
* He's sending us his results... let's collect chunks into the
* pss lwsac before worrying about anything else
*/
if (lws_is_first_fragment(wsi))
pss->tot = 0;
lws_metrics_om_ac_stash(pss, (const char *)in, len);
if (lws_is_final_fragment(wsi)) {
struct pss *partner_pss;
lwsl_info("%s: ws side received complete msg\n",
__func__);
/* the lwsac is complete */
pss->walk = pss->ac;
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
if (!partner_pss) {
lwsl_notice("%s: no partner A\n", __func__);
return -1;
}
/* indicate to scraper side we want to issue now */
p = start;
if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK,
"application/openmetrics-text; "
"version=1.0.0; charset=utf-8",
pss->tot, &p, end) ||
lws_finalize_write_http_header(partner_pss->wsi,
start, &p, end))
return -1;
/* indicate to scraper side we want to issue now */
partner_pss->walk = pss->ac;
partner_pss->trigger = 1;
lws_callback_on_writable(partner_pss->wsi);
}
return 0;
case LWS_CALLBACK_SERVER_WRITEABLE:
if (!pss->trigger)
return 0;
pss->trigger = 0;
partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
if (!partner_pss) {
lwsl_err("%s: no partner\n", __func__);
return 0;
}
lwsl_info("%s: sending trigger to client\n", __func__);
*start = 'x';
if (lws_write(wsi, start, 1,
(enum lws_write_protocol)LWS_WRITE_TEXT) < 0)
return 1;
lws_validity_confirmed(wsi);
return 0;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
#endif
#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
/* 4) ws client that keeps wss connection up to metrics proxy ws server */
static int
callback_lws_openmetrics_prox_client(struct lws *wsi,
enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
*end = buf + sizeof(buf) - 1, *ip;
struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
lws_get_vhost(wsi), lws_get_protocol(wsi));
struct lws_context *cx = lws_get_context(wsi);
struct pss *pss = (struct pss *)user;
unsigned int m, wm;
const char *cp;
char first;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__,
lws_vh_tag(lws_get_vhost(wsi)));
/*
* We get told what to do when we are bound to the vhost
*/
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi), sizeof(struct vhd));
if (!vhd)
return 0;
vhd->cx = cx;
vhd->vhost = lws_get_vhost(wsi);
/* the proxy server uri */
if (lws_pvo_get_str(in, "ws-server-uri", &cp) || !cp) {
lwsl_warn("%s: ws-server-uri pvo required\n", __func__);
return 0;
}
lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri));
/* how we should be referenced at the proxy */
if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) {
lwsl_err("%s: metrics-proxy-path pvo required\n", __func__);
return 1;
}
lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path));
/* the shared secret to authenticate us as allowed to join */
if (lws_pvo_get_str(in, "ba-secret", &cp)) {
lwsl_err("%s: ba-secret pvo required\n", __func__);
return 1;
}
lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret));
lwsl_notice("%s: scheduling connect %s %s %s\n", __func__,
vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);
lws_validity_confirmed(wsi);
lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1);
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
if (vhd)
lws_sul_cancel(&vhd->sul);
break;
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
{
unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len;
char b[128];
/* authorize ourselves to the metrics proxy using basic auth */
if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret,
b, sizeof(b)))
break;
if (lws_add_http_header_by_token(wsi,
WSI_TOKEN_HTTP_AUTHORIZATION,
(unsigned char *)b,
(int)strlen(b), pp, pend))
return -1;
break;
}
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
in ? (char *)in : "(null)");
goto do_retry;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
lwsl_warn("%s: connected to ws metrics agg server\n", __func__);
pss->greet = 1;
lws_callback_on_writable(wsi);
lws_validity_confirmed(wsi);
return 0;
case LWS_CALLBACK_CLIENT_CLOSED:
lwsl_notice("%s: client closed\n", __func__);
lwsac_free(&pss->ac);
goto do_retry;
case LWS_CALLBACK_CLIENT_RECEIVE:
/*
* Proxy serverside sends us something to trigger us to create
* our metrics message and send it back over the ws link
*/
ome_prepare(cx, pss);
pss->walk = pss->ac;
lws_callback_on_writable(wsi);
lwsl_info("%s: dump requested\n", __func__);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
if (pss->greet) {
/*
* At first after establishing the we link, we send a
* message indicating to the metrics proxy how we
* should be referred to by the scraper to particularly
* select to talk to us
*/
lwsl_info("%s: sending greet '%s'\n", __func__,
vhd->metrics_proxy_path);
lws_strncpy((char *)start, vhd->metrics_proxy_path,
sizeof(buf) - LWS_PRE);
if (lws_write(wsi, start,
strlen(vhd->metrics_proxy_path),
LWS_WRITE_TEXT) < 0)
return 1;
lws_validity_confirmed(wsi);
pss->greet = 0;
return 0;
}
if (!pss->walk)
return 0;
/*
* We send the metrics dump in a single logical ws message,
* using ws fragmentation to split it around 1 mtu boundary
* and keep coming back until it's finished
*/
first = pss->walk == pss->ac;
do {
ip = (uint8_t *)pss->walk +
lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
m = (unsigned int)((ip[0] << 8) | ip[1]);
/* coverity */
if (m > lwsac_get_tail_pos(pss->walk) -
lwsac_sizeof(pss->walk == pss->ac)) {
lwsl_err("%s: size blow\n", __func__);
return -1;
}
if (lws_ptr_diff_size_t(end, p) < m)
break;
memcpy(p, ip + 2, m);
p += m;
pss->walk = lwsac_get_next(pss->walk);
} while (pss->walk);
if (!lws_ptr_diff_size_t(p, start)) {
lwsl_err("%s: stuck\n", __func__);
return -1;
}
wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first,
!pss->walk);
if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
(enum lws_write_protocol)wm) < 0) {
lwsl_notice("%s: write fail\n", __func__);
return 1;
}
lws_validity_confirmed(wsi);
lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start));
if (!pss->walk) {
lwsl_info("%s: dump send completed\n", __func__);
lwsac_free(&pss->ac);
} else
lws_callback_on_writable(wsi);
return 0;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
do_retry:
if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
omc_connect_client, &vhd->retry_count))
return 0;
vhd->retry_count = 0;
lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
omc_connect_client, &vhd->retry_count);
return 0;
}
#endif
LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = {
#if defined(LWS_WITH_SERVER)
{ /* for scraper directly: http export on listen socket */
"lws-openmetrics",
callback_lws_openmetrics_export,
sizeof(struct pss),
1024, 0, NULL, 0
},
{ /* for scraper via ws proxy: http export on listen socket */
"lws-openmetrics-prox-agg",
callback_lws_openmetrics_prox_agg,
sizeof(struct pss),
1024, 0, NULL, 0
},
{ /* metrics proxy server side: ws server for clients to connect to */
"lws-openmetrics-prox-server",
callback_lws_openmetrics_prox_server,
sizeof(struct pss),
1024, 0, NULL, 0
},
#endif
#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
{ /* client to metrics proxy: ws client to connect to metrics proxy*/
"lws-openmetrics-prox-client",
callback_lws_openmetrics_prox_client,
sizeof(struct pss),
1024, 0, NULL, 0
},
#endif
};
LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = {
.hdr = {
"lws OpenMetrics export",
"lws_protocol_plugin",
LWS_BUILD_HASH,
LWS_PLUGIN_API_MAGIC
},
.protocols = lws_openmetrics_export_protocols,
.count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols),
};