0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-28 22:52:30 +00:00

logs-management: Add function cancellability ()

* Add function cancellability

* Do not comment out removal of persistent dirs in run_stress_test.sh
This commit is contained in:
Dimitris P 2023-11-28 15:55:51 +00:00 committed by GitHub
parent f6a4ce8972
commit f6d9792fe8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 13 deletions

View file

@ -91,7 +91,7 @@ void circ_buff_search(logs_query_params_t *const p_query_params, struct File_inf
/* If exceeding quota or timeout is reached and new timestamp is different than previous,
* terminate query but inform caller about act_to_ts to continue from (its next value) in next call. */
if((res_buff->len >= p_query_params->quota || now_monotonic_usec() > p_query_params->stop_monotonic_ut) &&
if( (res_buff->len >= p_query_params->quota || terminate_logs_manag_query(p_query_params)) &&
items[i].cbi->timestamp != res_hdr.timestamp){
p_query_params->act_to_ts = res_hdr.timestamp;
break;

View file

@ -1288,7 +1288,7 @@ void db_search(logs_query_params_t *const p_query_params, struct File_info *cons
/* If exceeding quota or timeout is reached and new timestamp
* is different than previous, terminate query. */
if((res_buff->len >= p_query_params->quota || now_monotonic_usec() > p_query_params->stop_monotonic_ut) &&
if((res_buff->len >= p_query_params->quota || terminate_logs_manag_query(p_query_params)) &&
tmp_itm.timestamp != res_hdr.timestamp){
p_query_params->act_to_ts = res_hdr.timestamp;
break;

View file

@ -121,7 +121,6 @@ typedef struct function_query_status {
usec_t started_monotonic_ut;
// request
// SD_JOURNAL_FILE_SOURCE_TYPE source_type;
STRING *source;
usec_t after_ut;
usec_t before_ut;
@ -249,7 +248,6 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
const char *chart = NULL;
const char *source = NULL;
const char *progress_id = NULL;
// SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
// size_t filters = 0;
buffer_json_member_add_object(wb, "_request");
@ -444,7 +442,6 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
fqs->delta = (fqs->data_only) ? delta : false;
fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
fqs->source = string_strdupz(source);
// fqs->source_type = source_type;
fqs->entries = last;
fqs->last_modified = 0;
// fqs->filters = filters;
@ -516,7 +513,6 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_TAIL, fqs->tail);
buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_ID, progress_id);
buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_SOURCE, string2str(fqs->source));
// buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_AFTER, fqs->after_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_BEFORE, fqs->before_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE, fqs->if_modified_since);
@ -593,6 +589,7 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
(fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut / USEC_PER_MS : after_s * MSEC_PER_SEC;
}
query_params.cancelled = cancelled;
query_params.stop_monotonic_ut = now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC;
query_params.results_buff = buffer_create(query_params.quota, NULL);
@ -604,6 +601,7 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
ret = execute_logs_manag_query(&query_params);
size_t res_off = 0;
logs_query_res_hdr_t *p_res_hdr;
while(query_params.results_buff->len - res_off > 0){
@ -680,7 +678,8 @@ static void logsmanagement_function_facets(const char *transaction, char *functi
buffer_json_object_close(wb); // logs_management_meta
buffer_json_member_add_uint64(wb, "status", ret->http_code);
buffer_json_member_add_boolean(wb, "partial", ret->http_code != HTTP_RESP_OK);
buffer_json_member_add_boolean(wb, "partial", ret->http_code != HTTP_RESP_OK ||
ret->err_code == LOGS_QRY_RES_ERR_CODE_TIMEOUT);
buffer_json_member_add_string(wb, "type", "table");

View file

@ -102,6 +102,17 @@ const logs_qry_res_err_t *fetch_log_sources(BUFFER *wb){
return &logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_OK];
}
bool terminate_logs_manag_query(logs_query_params_t *const p_query_params){
if(p_query_params->cancelled && __atomic_load_n(p_query_params->cancelled, __ATOMIC_RELAXED)) {
return true;
}
if(now_monotonic_usec() > p_query_params->stop_monotonic_ut)
return true;
return false;
}
const logs_qry_res_err_t *execute_logs_manag_query(logs_query_params_t *p_query_params) {
struct File_info *p_file_infos[LOGS_MANAG_MAX_COMPOUND_QUERY_SOURCES] = {NULL};
@ -182,12 +193,12 @@ const logs_qry_res_err_t *execute_logs_manag_query(logs_query_params_t *p_query_
db_search(p_query_params, p_file_infos);
if( p_query_params->results_buff->len < p_query_params->quota &&
now_monotonic_usec() <= p_query_params->stop_monotonic_ut)
!terminate_logs_manag_query(p_query_params))
circ_buff_search(p_query_params, p_file_infos);
if(!p_query_params->order_by_asc &&
p_query_params->results_buff->len < p_query_params->quota &&
now_monotonic_usec() <= p_query_params->stop_monotonic_ut)
!terminate_logs_manag_query(p_query_params))
db_search(p_query_params, p_file_infos);
for(int pfi_off = 0; p_file_infos[pfi_off]; pfi_off++)
@ -214,6 +225,13 @@ const logs_qry_res_err_t *execute_logs_manag_query(logs_query_params_t *p_query_
freez(p_query_params->keyword);
}
if(terminate_logs_manag_query(p_query_params)){
return (p_query_params->cancelled &&
__atomic_load_n(p_query_params->cancelled, __ATOMIC_RELAXED)) ?
&logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_CANCELLED] /* cancelled */ :
&logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_TIMEOUT] /* timed out */ ;
}
if(!p_query_params->results_buff->len)
return &logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_NOT_FOUND_ERR];

View file

@ -29,7 +29,9 @@ typedef struct {
LOGS_QRY_RES_ERR_CODE_NOT_FOUND_ERR,
LOGS_QRY_RES_ERR_CODE_NOT_INIT_ERR,
LOGS_QRY_RES_ERR_CODE_SERVER_ERR,
LOGS_QRY_RES_ERR_CODE_UNMODIFIED } err_code;
LOGS_QRY_RES_ERR_CODE_UNMODIFIED,
LOGS_QRY_RES_ERR_CODE_CANCELLED,
LOGS_QRY_RES_ERR_CODE_TIMEOUT } err_code;
char const *const err_str;
const int http_code;
} logs_qry_res_err_t;
@ -40,7 +42,9 @@ static const logs_qry_res_err_t logs_qry_res_err[] = {
{ LOGS_QRY_RES_ERR_CODE_NOT_FOUND_ERR, "no results found", HTTP_RESP_OK },
{ LOGS_QRY_RES_ERR_CODE_NOT_INIT_ERR, "logs management engine not running", HTTP_RESP_SERVICE_UNAVAILABLE },
{ LOGS_QRY_RES_ERR_CODE_SERVER_ERR, "server error", HTTP_RESP_INTERNAL_SERVER_ERROR },
{ LOGS_QRY_RES_ERR_CODE_UNMODIFIED, "not modified", HTTP_RESP_NOT_MODIFIED }
{ LOGS_QRY_RES_ERR_CODE_UNMODIFIED, "not modified", HTTP_RESP_NOT_MODIFIED },
{ LOGS_QRY_RES_ERR_CODE_CANCELLED, "cancelled", HTTP_RESP_CLIENT_CLOSED_REQUEST },
{ LOGS_QRY_RES_ERR_CODE_TIMEOUT, "query timed out", HTTP_RESP_OK }
};
const logs_qry_res_err_t *fetch_log_sources(BUFFER *wb);
@ -108,6 +112,7 @@ typedef struct logs_query_params {
msec_t act_to_ts;
int order_by_asc;
unsigned long quota;
bool *cancelled;
usec_t stop_monotonic_ut;
char *chartname[LOGS_MANAG_MAX_COMPOUND_QUERY_SOURCES];
char *filename[LOGS_MANAG_MAX_COMPOUND_QUERY_SOURCES];
@ -129,10 +134,16 @@ typedef struct logs_query_res_hdr {
char chartname[20];
} logs_query_res_hdr_t;
/**
* @brief Check if query should be terminated.
* @param p_query_params See documentation of logs_query_params_t struct.
* @return true if query should be terminated of false otherwise.
*/
bool terminate_logs_manag_query(logs_query_params_t *p_query_params);
/**
* @brief Primary query API.
* @param p_query_params See documentation of logs_query_params_t struct on how
* to use argument.
* @param p_query_params See documentation of logs_query_params_t struct.
* @return enum of LOGS_QRY_RES_ERR_CODE with result of query
* @todo Cornercase if filename not found in DB? Return specific message?
*/