164 lines
5.0 KiB
C
164 lines
5.0 KiB
C
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
#include "compression_zstd.h"
|
|
|
|
#ifdef ENABLE_ZSTD
|
|
#include <zstd.h>
|
|
|
|
void rrdpush_compressor_init_zstd(struct compressor_state *state) {
|
|
if(!state->initialized) {
|
|
state->initialized = true;
|
|
state->stream = ZSTD_createCStream();
|
|
|
|
if(state->level < 1)
|
|
state->level = 1;
|
|
|
|
if(state->level > ZSTD_maxCLevel())
|
|
state->level = ZSTD_maxCLevel();
|
|
|
|
size_t ret = ZSTD_initCStream(state->stream, state->level);
|
|
if(ZSTD_isError(ret))
|
|
netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
|
|
|
|
// ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
|
|
// ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
|
|
}
|
|
}
|
|
|
|
void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
|
|
if(state->stream) {
|
|
ZSTD_freeCStream(state->stream);
|
|
state->stream = NULL;
|
|
}
|
|
}
|
|
|
|
size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
|
|
if(unlikely(!state || !size || !out))
|
|
return 0;
|
|
|
|
ZSTD_inBuffer inBuffer = {
|
|
.pos = 0,
|
|
.size = size,
|
|
.src = data,
|
|
};
|
|
|
|
size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
|
|
simple_ring_buffer_make_room(&state->output, wanted_size);
|
|
|
|
ZSTD_outBuffer outBuffer = {
|
|
.pos = 0,
|
|
.size = state->output.size,
|
|
.dst = (void *)state->output.data,
|
|
};
|
|
|
|
// compress
|
|
size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
|
|
|
|
// error handling
|
|
if(ZSTD_isError(ret)) {
|
|
netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
|
|
return 0;
|
|
}
|
|
|
|
if(inBuffer.pos < inBuffer.size) {
|
|
netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
|
|
inBuffer.size, inBuffer.pos);
|
|
return 0;
|
|
}
|
|
|
|
if(outBuffer.pos == 0) {
|
|
// ZSTD needs more input to flush the output, so let's flush it manually
|
|
ret = ZSTD_flushStream(state->stream, &outBuffer);
|
|
|
|
if(ZSTD_isError(ret)) {
|
|
netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
|
|
return 0;
|
|
}
|
|
|
|
if(outBuffer.pos == 0) {
|
|
netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
|
|
"(source is %zu bytes, output buffer can fit %zu bytes) "
|
|
, size, outBuffer.size);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
state->sender_locked.total_compressions++;
|
|
state->sender_locked.total_uncompressed += size;
|
|
state->sender_locked.total_compressed += outBuffer.pos;
|
|
|
|
// return values
|
|
*out = state->output.data;
|
|
return outBuffer.pos;
|
|
}
|
|
|
|
void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
|
|
if(!state->initialized) {
|
|
state->initialized = true;
|
|
state->stream = ZSTD_createDStream();
|
|
|
|
size_t ret = ZSTD_initDStream(state->stream);
|
|
if(ZSTD_isError(ret))
|
|
netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
|
|
|
|
simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
|
|
}
|
|
}
|
|
|
|
void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
|
|
if (state->stream) {
|
|
ZSTD_freeDStream(state->stream);
|
|
state->stream = NULL;
|
|
}
|
|
}
|
|
|
|
size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
|
|
if (unlikely(!state || !compressed_data || !compressed_size))
|
|
return 0;
|
|
|
|
// The state.output ring buffer is always EMPTY at this point,
|
|
// meaning that (state->output.read_pos == state->output.write_pos)
|
|
// However, THEY ARE NOT ZERO.
|
|
|
|
ZSTD_inBuffer inBuffer = {
|
|
.pos = 0,
|
|
.size = compressed_size,
|
|
.src = compressed_data,
|
|
};
|
|
|
|
ZSTD_outBuffer outBuffer = {
|
|
.pos = 0,
|
|
.dst = (char *)state->output.data,
|
|
.size = state->output.size,
|
|
};
|
|
|
|
size_t ret = ZSTD_decompressStream(
|
|
state->stream
|
|
, &outBuffer
|
|
, &inBuffer);
|
|
|
|
if(ZSTD_isError(ret)) {
|
|
netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
|
|
return 0;
|
|
}
|
|
|
|
if(inBuffer.pos < inBuffer.size)
|
|
fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
|
|
"but %zu bytes of compressed data remain",
|
|
inBuffer.pos, inBuffer.size);
|
|
|
|
size_t decompressed_size = outBuffer.pos;
|
|
|
|
state->output.read_pos = 0;
|
|
state->output.write_pos = outBuffer.pos;
|
|
|
|
// statistics
|
|
state->total_compressed += compressed_size;
|
|
state->total_uncompressed += decompressed_size;
|
|
state->total_compressions++;
|
|
|
|
return decompressed_size;
|
|
}
|
|
|
|
#endif // ENABLE_ZSTD
|