From fa0ad568176827c2774bde0bb83b0be296eb541b Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 22 Dec 2023 19:50:57 +0800 Subject: [PATCH] [exec](compress) use FragmentTransmissionCompressionCodec control the exchange compress behavior (#28818) --- be/src/common/config.cpp | 2 -- be/src/common/config.h | 2 -- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/pipeline/exec/exchange_sink_operator.h | 2 +- be/src/runtime/runtime_state.h | 6 +++++- be/src/vec/core/block.cpp | 2 +- be/src/vec/sink/vdata_stream_sender.h | 2 +- be/test/vec/core/block_test.cpp | 2 -- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 9 files changed, 10 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ecc44a08e4..11921eac8b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -222,8 +222,6 @@ DEFINE_Int32(be_service_threads, "64"); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. DEFINE_Int32(num_threads_per_core, "3"); -// if true, compresses tuple data in Serialize -DEFINE_mBool(compress_rowbatches, "true"); DEFINE_mBool(rowbatch_align_tuple_offset, "false"); // interval between profile reports; in seconds DEFINE_mInt32(status_report_interval, "5"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a9508c6e8a..c73637200a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -271,8 +271,6 @@ DECLARE_Int32(be_service_threads); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. DECLARE_Int32(num_threads_per_core); -// if true, compresses tuple data in Serialize -DECLARE_mBool(compress_rowbatches); DECLARE_mBool(rowbatch_align_tuple_offset); // interval between profile reports; in seconds DECLARE_mInt32(status_report_interval); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 548680a25c..43bec0bd92 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -252,7 +252,7 @@ std::string ExchangeSinkLocalState::name_suffix() { return name; } -segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() { +segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const { return _parent->cast()._compression_type; } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 70ae126fca..5df03ea777 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -145,7 +145,7 @@ public: [[nodiscard]] int sender_id() const { return _sender_id; } std::string name_suffix() override; - segment_v2::CompressionTypePB& compression_type(); + segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; std::vector*> channels; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 989655a36c..6ced139eb4 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -393,9 +393,13 @@ public: if (_query_options.__isset.fragment_transmission_compression_codec) { if (_query_options.fragment_transmission_compression_codec == "lz4") { return segment_v2::CompressionTypePB::LZ4; + } else if (_query_options.fragment_transmission_compression_codec == "snappy") { + return segment_v2::CompressionTypePB::SNAPPY; + } else { + return segment_v2::CompressionTypePB::NO_COMPRESSION; } } - return segment_v2::CompressionTypePB::SNAPPY; + return segment_v2::CompressionTypePB::NO_COMPRESSION; } bool skip_storage_engine_merge() const { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index d3d7523c0f..a7965d03ce 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -866,7 +866,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, *uncompressed_bytes = content_uncompressed_size; // compress - if (config::compress_rowbatches && content_uncompressed_size > 0) { + if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { SCOPED_RAW_TIMER(&_compress_time_ns); pblock->set_compression_type(compression_type); pblock->set_uncompressed_size(content_uncompressed_size); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 54822f8968..f59dad266f 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -149,7 +149,7 @@ public: QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; } bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; } RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } - segment_v2::CompressionTypePB& compression_type() { return _compression_type; } + segment_v2::CompressionTypePB compression_type() const { return _compression_type; } protected: friend class BlockSerializer; diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 54dd6c1136..020c4f2e92 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -114,7 +114,6 @@ void fill_block_with_array_string(vectorized::Block& block) { } void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_type) { - config::compress_rowbatches = true; // int { auto vec = vectorized::ColumnVector::create(); @@ -296,7 +295,6 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty } TEST(BlockTest, SerializeAndDeserializeBlock) { - config::compress_rowbatches = true; serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY); serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b0e20d21ad..3331971744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2720,7 +2720,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFunctionPushdown(enableFunctionPushdown); tResult.setEnableCommonExprPushdown(enableCommonExprPushdown); tResult.setCheckOverflowForDecimal(checkOverflowForDecimal); - tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); + tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase()); tResult.setEnableLocalExchange(enableLocalExchange); tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);