diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 4c2e918a3a..0698c2d606 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -216,7 +216,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) } // send msg. - aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); + w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); struct timeval tv; diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 86ee818edc..d11e96cfba 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -83,7 +83,10 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec memcpy(&full_msg[len], payload, payload_len); } - mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id); + + if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER) + return HTTP_RESP_FORBIDDEN; #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); @@ -169,11 +172,11 @@ void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char json_object_object_add(msg, "error-description", tmp); if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) { - error("Failed to send cancelation message for http reply"); + error("Failed to send cancellation message for http reply %zu %s", payload_len, payload); } } -void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) +int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len) { json_object *tmp, *msg; @@ -192,7 +195,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg switch (rc) { case HTTP_RESP_FORBIDDEN: - aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len); + aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, NULL, 0); break; case HTTP_RESP_INTERNAL_SERVER_ERROR: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len); @@ -201,6 +204,7 @@ void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len); break; } + return rc ? rc : http_code; } uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) { diff --git a/aclk/aclk_tx_msgs.h b/aclk/aclk_tx_msgs.h index 31e5924100..9e7d890772 100644 --- a/aclk/aclk_tx_msgs.h +++ b/aclk/aclk_tx_msgs.h @@ -12,7 +12,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len); -void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); +int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len); uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable); char *aclk_generate_lwt(size_t *size); diff --git a/mqtt_websockets b/mqtt_websockets index 684fa17afb..cb553fd1a6 160000 --- a/mqtt_websockets +++ b/mqtt_websockets @@ -1 +1 @@ -Subproject commit 684fa17afb14ef5a46dde18802548615fdd441ac +Subproject commit cb553fd1a6929b107f36ba0c320575449b9752b6