[fix] disable transfer data large than 2GB by brpc (#9770)

because of brpc and protobuf cannot transfer data large than 2GB, if large than 2GB will overflow, so add a check before send
This commit is contained in:
Zhengguo Yang
2022-05-25 18:41:13 +08:00
committed by GitHub
parent be026addde
commit f5bef328fe
2 changed files with 30 additions and 16 deletions

View File

@ -273,10 +273,6 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
try {
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
_compression_scratch.resize(max_compressed_size);
} catch (const std::bad_alloc& e) {
can_compress = false;
LOG(WARNING) << "Try to alloc " << max_compressed_size
<< " bytes for compression scratch failed. " << e.what();
} catch (...) {
can_compress = false;
std::exception_ptr p = std::current_exception();
@ -309,11 +305,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
return Status::InternalError(
fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
"please set BE config 'transfer_data_by_brpc_attachment' to true "
"and restart BE.",
pb_size));
return Status::InternalError(fmt::format(
"The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
}
} else {
*uncompressed_size = pb_size + tuple_byte_size;

View File

@ -669,7 +669,16 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
// serialize data values
// when data type is HLL, content_uncompressed_size maybe larger than real size.
allocated_buf->resize(content_uncompressed_size);
try {
allocated_buf->resize(content_uncompressed_size);
} catch (...) {
std::exception_ptr p = std::current_exception();
std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}",
content_uncompressed_size,
p ? p.__cxa_exception_type()->name() : "null");
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
char* buf = allocated_buf->data();
for (const auto& c : *this) {
buf = c.type->serialize(*(c.column), buf);
@ -678,12 +687,21 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
// Try compressing the content to compression_scratch,
// swap if compressed data is smaller
size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
std::string compression_scratch;
uint32_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
compression_scratch.resize(max_compressed_size);
try {
// Try compressing the content to compression_scratch,
// swap if compressed data is smaller
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
compression_scratch.resize(max_compressed_size);
} catch (...) {
std::exception_ptr p = std::current_exception();
std::string msg =
fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}",
max_compressed_size, p ? p.__cxa_exception_type()->name() : "null");
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
size_t compressed_size = 0;
char* compressed_output = compression_scratch.data();
snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
@ -701,7 +719,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
VLOG_ROW << "uncompressed size: " << content_uncompressed_size
<< ", compressed size: " << compressed_size;
}
if (*compressed_bytes >= std::numeric_limits<int32_t>::max()) {
return Status::InternalError(fmt::format(
"The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
}
return Status::OK();
}