From e3ab2caef8cdc3872cf42ce709b3e252b9505bd3 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 25 Aug 2022 19:28:23 +0800 Subject: [PATCH] [improvement](sink) Support local exchange for multi fragment instances (#12017) --- be/src/vec/sink/vdata_stream_sender.cpp | 21 ++++++++++++------- be/src/vec/sink/vdata_stream_sender.h | 3 +++ .../org/apache/doris/qe/SessionVariable.java | 10 +++++++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 00fd63c692..b2ef88af49 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -76,6 +76,10 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) { return Status::InternalError(msg); } + if (state->query_options().__isset.enable_local_exchange) { + _enable_local_exchange = state->query_options().enable_local_exchange; + } + // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() @@ -85,11 +89,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) { } Status VDataStreamSender::Channel::send_current_block(bool eos) { - // TODO: Now, local exchange will cause the performance problem is in a multi-threaded scenario - // so this feature is turned off here. We need to re-examine this logic - // if (is_local()) { - // return send_local_block(eos); - // } + // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario + // so this feature is turned off here by default. We need to re-examine this logic + if (_enable_local_exchange && is_local()) { + return send_local_block(eos); + } auto block = _mutable_block->to_block(); RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block)); block.clear_column_data(); @@ -103,15 +107,16 @@ Status VDataStreamSender::Channel::send_local_block(bool eos) { std::shared_ptr recvr = _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id, _dest_node_id); + Block block = _mutable_block->to_block(); + _mutable_block->set_muatable_columns(block.clone_empty_columns()); if (recvr != nullptr) { - Block block = _mutable_block->to_block(); COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes()); + COUNTER_UPDATE(_parent->_local_sent_rows, block.rows()); recvr->add_block(&block, _parent->_sender_id, true); if (eos) { recvr->remove_sender(_parent->_sender_id, _be_number); } } - _mutable_block->clear(); return Status::OK(); } @@ -121,6 +126,7 @@ Status VDataStreamSender::Channel::send_local_block(Block* block) { _dest_node_id); if (recvr != nullptr) { COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes()); + COUNTER_UPDATE(_parent->_local_sent_rows, block->rows()); recvr->add_block(block, _parent->_sender_id, false); } return Status::OK(); @@ -426,6 +432,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT); + _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT); _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); _compress_timer = ADD_TIMER(profile(), "CompressTime"); _overall_throughput = profile()->add_derived_counter( diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index be9861caf5..c03eb0804c 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -135,6 +135,7 @@ protected: RuntimeProfile::Counter* _bytes_sent_counter; RuntimeProfile::Counter* _uncompressed_bytes_counter; RuntimeProfile::Counter* _ignore_rows; + RuntimeProfile::Counter* _local_sent_rows; std::unique_ptr _mem_tracker; @@ -302,6 +303,8 @@ private: PBlock* _ch_cur_pb_block = nullptr; PBlock _ch_pb_block1; PBlock _ch_pb_block2; + + bool _enable_local_exchange = false; }; template diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 70be8869d2..026920ccb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = "fragment_transmission_compression_codec"; + public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -525,6 +527,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN) public boolean enableFunctionPushdown; + @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE) + public boolean enableLocalExchange = false; + public String getBlockEncryptionMode() { return blockEncryptionMode; } @@ -918,6 +923,10 @@ public class SessionVariable implements Serializable, Writable { return this.enableFunctionPushdown; } + public boolean getEnableLocalExchange() { + return enableLocalExchange; + } + /** * getInsertVisibleTimeoutMs. **/ @@ -1113,6 +1122,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFunctionPushdown(enableFunctionPushdown); tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); + tResult.setEnableLocalExchange(enableLocalExchange); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 92d41abbd7..b9ea800554 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -167,6 +167,8 @@ struct TQueryOptions { 45: optional bool enable_function_pushdown; 46: optional string fragment_transmission_compression_codec; + + 47: optional bool enable_local_exchange; }