* [fix](compression) handle exception to reuse compression context Otherwise, there is memleak and new context is allocated, then flush tlb consumes a lot sys cpu.
This commit is contained in:
@ -128,31 +128,39 @@ public:
|
||||
_release_compression_ctx(std::move(context));
|
||||
}
|
||||
}};
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(input.size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer.resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
size_t compressed_len =
|
||||
LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
|
||||
input.size, compressed_buf.size, ACCELARATION);
|
||||
if (compressed_len == 0) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
|
||||
compressed_buf.size);
|
||||
}
|
||||
output->resize(compressed_len);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
|
||||
try {
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(input.size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
size_t compressed_len =
|
||||
LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
|
||||
input.size, compressed_buf.size, ACCELARATION);
|
||||
if (compressed_len == 0) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
|
||||
compressed_buf.size);
|
||||
}
|
||||
output->resize(compressed_len);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
|
||||
compressed_len);
|
||||
}
|
||||
} catch (...) {
|
||||
// Do not set compress_failed to release context
|
||||
DCHECK(!compress_failed);
|
||||
return Status::InternalError("Fail to do LZ4Block compress due to exception");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -296,50 +304,57 @@ private:
|
||||
_release_compression_ctx(std::move(context));
|
||||
}
|
||||
}};
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(uncompressed_size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer.resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
|
||||
&_s_preferences);
|
||||
if (LZ4F_isError(wbytes)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
|
||||
LZ4F_getErrorName(wbytes));
|
||||
}
|
||||
size_t offset = wbytes;
|
||||
for (auto input : inputs) {
|
||||
wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
|
||||
compressed_buf.size - offset, input.data, input.size,
|
||||
nullptr);
|
||||
try {
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(uncompressed_size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
|
||||
&_s_preferences);
|
||||
if (LZ4F_isError(wbytes)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
|
||||
LZ4F_getErrorName(wbytes));
|
||||
}
|
||||
size_t offset = wbytes;
|
||||
for (auto input : inputs) {
|
||||
wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
|
||||
compressed_buf.size - offset, input.data, input.size,
|
||||
nullptr);
|
||||
if (LZ4F_isError(wbytes)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
|
||||
LZ4F_getErrorName(wbytes));
|
||||
}
|
||||
offset += wbytes;
|
||||
}
|
||||
wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
|
||||
compressed_buf.size - offset, nullptr);
|
||||
if (LZ4F_isError(wbytes)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
|
||||
LZ4F_getErrorName(wbytes));
|
||||
}
|
||||
offset += wbytes;
|
||||
}
|
||||
wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
|
||||
compressed_buf.size - offset, nullptr);
|
||||
if (LZ4F_isError(wbytes)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
|
||||
LZ4F_getErrorName(wbytes));
|
||||
}
|
||||
offset += wbytes;
|
||||
output->resize(offset);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
|
||||
output->resize(offset);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
|
||||
}
|
||||
} catch (...) {
|
||||
// Do not set compress_failed to release context
|
||||
DCHECK(!compress_failed);
|
||||
return Status::InternalError("Fail to do LZ4F compress due to exception");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -481,30 +496,37 @@ public:
|
||||
_release_compression_ctx(std::move(context));
|
||||
}
|
||||
}};
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(input.size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer.resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
size_t compressed_len = LZ4_compress_HC_continue(
|
||||
context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
|
||||
if (compressed_len == 0) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
|
||||
compressed_buf.size);
|
||||
}
|
||||
output->resize(compressed_len);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
|
||||
try {
|
||||
Slice compressed_buf;
|
||||
size_t max_len = max_compressed_len(input.size);
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
context->buffer->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
size_t compressed_len = LZ4_compress_HC_continue(
|
||||
context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
|
||||
if (compressed_len == 0) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
|
||||
compressed_buf.size);
|
||||
}
|
||||
output->resize(compressed_len);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
|
||||
compressed_len);
|
||||
}
|
||||
} catch (...) {
|
||||
// Do not set compress_failed to release context
|
||||
DCHECK(!compress_failed);
|
||||
return Status::InternalError("Fail to do LZ4HC compress due to exception");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -798,67 +820,73 @@ public:
|
||||
}
|
||||
}};
|
||||
|
||||
size_t max_len = max_compressed_len(uncompressed_size);
|
||||
Slice compressed_buf;
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer.resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
try {
|
||||
size_t max_len = max_compressed_len(uncompressed_size);
|
||||
Slice compressed_buf;
|
||||
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
// use output directly
|
||||
output->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(output->data());
|
||||
compressed_buf.size = max_len;
|
||||
} else {
|
||||
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
|
||||
context->buffer->resize(max_len);
|
||||
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
|
||||
compressed_buf.size = max_len;
|
||||
}
|
||||
|
||||
// set compression level to default 3
|
||||
auto ret =
|
||||
ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT);
|
||||
if (ZSTD_isError(ret)) {
|
||||
return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
// set checksum flag to 1
|
||||
ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
|
||||
if (ZSTD_isError(ret)) {
|
||||
return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
// set compression level to default 3
|
||||
auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
|
||||
ZSTD_CLEVEL_DEFAULT);
|
||||
if (ZSTD_isError(ret)) {
|
||||
return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
// set checksum flag to 1
|
||||
ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
|
||||
if (ZSTD_isError(ret)) {
|
||||
return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
|
||||
ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
|
||||
ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
|
||||
|
||||
for (size_t i = 0; i < inputs.size(); i++) {
|
||||
ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
|
||||
for (size_t i = 0; i < inputs.size(); i++) {
|
||||
ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
|
||||
|
||||
bool last_input = (i == inputs.size() - 1);
|
||||
auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
|
||||
bool last_input = (i == inputs.size() - 1);
|
||||
auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
|
||||
|
||||
bool finished = false;
|
||||
do {
|
||||
// do compress
|
||||
auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
|
||||
bool finished = false;
|
||||
do {
|
||||
// do compress
|
||||
auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
|
||||
|
||||
if (ZSTD_isError(ret)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
if (ZSTD_isError(ret)) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
|
||||
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
|
||||
}
|
||||
|
||||
// ret is ZSTD hint for needed output buffer size
|
||||
if (ret > 0 && out_buf.pos == out_buf.size) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
|
||||
}
|
||||
// ret is ZSTD hint for needed output buffer size
|
||||
if (ret > 0 && out_buf.pos == out_buf.size) {
|
||||
compress_failed = true;
|
||||
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
|
||||
}
|
||||
|
||||
finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
|
||||
} while (!finished);
|
||||
}
|
||||
finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
|
||||
} while (!finished);
|
||||
}
|
||||
|
||||
// set compressed size for caller
|
||||
output->resize(out_buf.pos);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
|
||||
// set compressed size for caller
|
||||
output->resize(out_buf.pos);
|
||||
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
|
||||
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
|
||||
}
|
||||
} catch (...) {
|
||||
// Do not set compress_failed to release context
|
||||
DCHECK(!compress_failed);
|
||||
return Status::InternalError("Fail to do ZSTD compress due to exception");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user