0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-13 01:08:11 +00:00

Honor maximum message size limit of MQTT server ()

This commit is contained in:
Timotej S 2023-05-18 13:56:20 +02:00 committed by GitHub
parent b20abcbc1e
commit 5827732aa7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 7 deletions

View file

@ -216,7 +216,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
}
// send msg.
aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
struct timeval tv;

View file

@ -83,7 +83,10 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
memcpy(&full_msg[len], payload, payload_len);
}
mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER)
return HTTP_RESP_FORBIDDEN;
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
@ -169,11 +172,11 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char
json_object_object_add(msg, "error-description", tmp);
if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
error("Failed to send cancelation message for http reply");
error("Failed to send cancellation message for http reply %zu %s", payload_len, payload);
}
}
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
@ -192,7 +195,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
switch (rc) {
case HTTP_RESP_FORBIDDEN:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, NULL, 0);
break;
case HTTP_RESP_INTERNAL_SERVER_ERROR:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
@ -201,6 +204,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
break;
}
return rc ? rc : http_code;
}
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {

View file

@ -12,7 +12,7 @@
uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len);
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable);
char *aclk_generate_lwt(size_t *size);

@ -1 +1 @@
Subproject commit 684fa17afb14ef5a46dde18802548615fdd441ac
Subproject commit cb553fd1a6929b107f36ba0c320575449b9752b6