[improvement](compress) Support compress/decompress block with lz4 (#11955)

This commit is contained in:
Jerry Hu
2022-08-22 17:35:43 +08:00
committed by GitHub
parent 0c5b4ecc7c
commit c22d097b59
13 changed files with 113 additions and 33 deletions

View File

@ -351,6 +351,15 @@ public:
return _query_options.enable_enable_exchange_node_parallel_merge;
}
segment_v2::CompressionTypePB fragement_transmission_compression_type() {
if (_query_options.__isset.fragment_transmission_compression_codec) {
if (_query_options.fragment_transmission_compression_codec == "lz4") {
return segment_v2::CompressionTypePB::LZ4;
}
}
return segment_v2::CompressionTypePB::SNAPPY;
}
// the following getters are only valid after Prepare()
InitialReservations* initial_reservations() const { return _initial_reservations; }

View File

@ -61,7 +61,8 @@ struct AggregateFunctionSortData {
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes);
block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::SNAPPY);
write_string_binary(pblock.SerializeAsString(), buf);
}

View File

@ -34,6 +34,7 @@
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "udf/udf.h"
#include "util/block_compression.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
@ -79,16 +80,28 @@ Block::Block(const PBlock& pblock) {
std::string compression_scratch;
if (pblock.compressed()) {
// Decompress
SCOPED_RAW_TIMER(&_decompress_time_ns);
const char* compressed_data = pblock.column_values().c_str();
size_t compressed_size = pblock.column_values().size();
size_t uncompressed_size = 0;
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
compression_scratch.resize(uncompressed_size);
success =
snappy::RawUncompress(compressed_data, compressed_size, compression_scratch.data());
DCHECK(success) << "snappy::RawUncompress failed";
if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
std::unique_ptr<BlockCompressionCodec> codec;
get_block_compression_codec(pblock.compression_type(), codec);
uncompressed_size = pblock.uncompressed_size();
compression_scratch.resize(uncompressed_size);
Slice decompressed_slice(compression_scratch);
codec->decompress(Slice(compressed_data, compressed_size), &decompressed_slice);
DCHECK(uncompressed_size == decompressed_slice.size);
} else {
bool success = snappy::GetUncompressedLength(compressed_data, compressed_size,
&uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
compression_scratch.resize(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size,
compression_scratch.data());
DCHECK(success) << "snappy::RawUncompress failed";
}
_decompressed_bytes = uncompressed_size;
buf = compression_scratch.data();
} else {
buf = pblock.column_values().data();
@ -684,6 +697,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
}
Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
// calc uncompressed size for allocation
size_t content_uncompressed_size = 0;
@ -717,7 +731,14 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
pblock->set_compression_type(compression_type);
pblock->set_uncompressed_size(content_uncompressed_size);
std::unique_ptr<BlockCompressionCodec> codec;
RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));
size_t max_compressed_size = codec->max_compressed_len(content_uncompressed_size);
std::string compression_scratch;
try {
// Try compressing the content to compression_scratch,
@ -732,10 +753,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
size_t compressed_size = 0;
char* compressed_output = compression_scratch.data();
snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output,
&compressed_size);
Slice compressed_slice(compression_scratch);
codec->compress(Slice(column_values->data(), content_uncompressed_size), &compressed_slice);
size_t compressed_size = compressed_slice.size;
if (LIKELY(compressed_size < content_uncompressed_size)) {
compression_scratch.resize(compressed_size);

View File

@ -65,6 +65,11 @@ private:
Container data;
IndexByName index_by_name;
int64_t _decompress_time_ns = 0;
int64_t _decompressed_bytes = 0;
int64_t _compress_time_ns = 0;
public:
BlockInfo info;
@ -262,6 +267,7 @@ public:
// serialize block to PBlock
Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;
// serialize block to PRowbatch
@ -335,6 +341,10 @@ public:
void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx);
int64_t get_decompress_time() const { return _decompress_time_ns; }
int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
int64_t get_compress_time() const { return _compress_time_ns; }
private:
void erase_impl(size_t position);
void initialize_index_by_name();

View File

@ -125,6 +125,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
{
SCOPED_TIMER(_recvr->_deserialize_row_batch_timer);
block = new Block(pblock);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
}
VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n";
@ -284,6 +286,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
_buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
}
VDataStreamRecvr::~VDataStreamRecvr() {

View File

@ -128,6 +128,8 @@ private:
RuntimeProfile::Counter* _first_batch_wait_total_timer;
RuntimeProfile::Counter* _buffer_full_total_timer;
RuntimeProfile::Counter* _data_arrival_timer;
RuntimeProfile::Counter* _decompress_timer;
RuntimeProfile::Counter* _decompress_bytes;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
};

View File

@ -330,6 +330,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
@ -347,6 +348,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
@ -425,6 +427,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
_ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
_compress_timer = ADD_TIMER(profile(), "CompressTime");
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
@ -445,6 +448,8 @@ Status VDataStreamSender::open(RuntimeState* state) {
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->open(state));
}
_compression_type = state->fragement_transmission_compression_type();
return Status::OK();
}
@ -597,9 +602,10 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
_transfer_large_data_by_brpc));
_compression_type, _transfer_large_data_by_brpc));
COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
COUNTER_UPDATE(_compress_timer, src->get_compress_time());
}
return Status::OK();

View File

@ -131,6 +131,7 @@ protected:
RuntimeProfile* _profile; // Allocated from _pool
RuntimeProfile::Counter* _serialize_batch_timer;
RuntimeProfile::Counter* _compress_timer;
RuntimeProfile::Counter* _bytes_sent_counter;
RuntimeProfile::Counter* _uncompressed_bytes_counter;
RuntimeProfile::Counter* _ignore_rows;
@ -146,6 +147,8 @@ protected:
// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;
segment_v2::CompressionTypePB _compression_type;
};
// TODO: support local exechange

View File

@ -258,6 +258,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes,
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));

View File

@ -180,10 +180,12 @@ TEST(BlockTest, RowBatchCovertToBlock) {
}
}
void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
void block_to_pb(
const vectorized::Block& block, PBlock* pblock,
segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes);
Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, compression_type);
EXPECT_TRUE(st.ok());
EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
EXPECT_EQ(compressed_bytes, pblock->column_values().size());
@ -237,7 +239,7 @@ void fill_block_with_array_string(vectorized::Block& block) {
block.insert(test_array_string);
}
TEST(BlockTest, SerializeAndDeserializeBlock) {
void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_type) {
config::compress_rowbatches = true;
// int
{
@ -250,12 +252,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -271,12 +273,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_string");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -295,12 +297,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
decimal_data_type, "test_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -321,12 +323,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_bitmap");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -341,12 +343,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_data_type, "test_nullable");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -361,14 +363,14 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_column->get_ptr(), nullable_data_type, "test_nullable_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
EXPECT_EQ(1, pblock.column_metas_size());
EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -385,12 +387,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
data_type, "test_nullable_int32");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@ -400,17 +402,23 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
fill_block_with_array_int(block);
fill_block_with_array_string(block);
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
}
TEST(BlockTest, SerializeAndDeserializeBlock) {
config::compress_rowbatches = true;
serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
}
TEST(BlockTest, dump_data) {
auto vec = vectorized::ColumnVector<Int32>::create();
auto& int32_data = vec->get_data();

View File

@ -207,6 +207,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_FUNCTION_PUSHDOWN = "enable_function_pushdown";
public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = "fragment_transmission_compression_codec";
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@ -352,6 +354,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
public String preferJoinMethod = "broadcast";
@VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC)
public String fragmentTransmissionCompressionCodec = "lz4";
/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
@ -1060,6 +1065,10 @@ public class SessionVariable implements Serializable, Writable {
this.enableRemoveNoConjunctsRuntimeFilterPolicy = enableRemoveNoConjunctsRuntimeFilterPolicy;
}
public void setFragmentTransmissionCompressionCodec(String codec) {
this.fragmentTransmissionCompressionCodec = codec;
}
// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
@ -1103,6 +1112,7 @@ public class SessionVariable implements Serializable, Writable {
}
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
return tResult;
}

View File

@ -21,6 +21,7 @@ package doris;
option java_package = "org.apache.doris.proto";
import "types.proto";
import "segment_v2.proto";
message PNodeStatistics {
required int64 node_id = 1;
@ -63,4 +64,6 @@ message PBlock {
repeated PColumnMeta column_metas = 1;
optional bytes column_values = 2;
optional bool compressed = 3 [default = false];
optional int64 uncompressed_size = 4;
optional segment_v2.CompressionTypePB compression_type = 5 [default = SNAPPY];
}

View File

@ -165,6 +165,8 @@ struct TQueryOptions {
44: optional bool trim_tailing_spaces_for_external_table_query = false
45: optional bool enable_function_pushdown;
46: optional string fragment_transmission_compression_codec;
}