[improvement](sink) Support local exchange for multi fragment instances (#12017)
This commit is contained in:
@ -76,6 +76,10 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
|
||||
return Status::InternalError(msg);
|
||||
}
|
||||
|
||||
if (state->query_options().__isset.enable_local_exchange) {
|
||||
_enable_local_exchange = state->query_options().enable_local_exchange;
|
||||
}
|
||||
|
||||
// In bucket shuffle join will set fragment_instance_id (-1, -1)
|
||||
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
|
||||
// so the empty channel not need call function close_internal()
|
||||
@ -85,11 +89,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status VDataStreamSender::Channel::send_current_block(bool eos) {
|
||||
// TODO: Now, local exchange will cause the performance problem is in a multi-threaded scenario
|
||||
// so this feature is turned off here. We need to re-examine this logic
|
||||
// if (is_local()) {
|
||||
// return send_local_block(eos);
|
||||
// }
|
||||
// FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario
|
||||
// so this feature is turned off here by default. We need to re-examine this logic
|
||||
if (_enable_local_exchange && is_local()) {
|
||||
return send_local_block(eos);
|
||||
}
|
||||
auto block = _mutable_block->to_block();
|
||||
RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
|
||||
block.clear_column_data();
|
||||
@ -103,15 +107,16 @@ Status VDataStreamSender::Channel::send_local_block(bool eos) {
|
||||
std::shared_ptr<VDataStreamRecvr> recvr =
|
||||
_parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
|
||||
_dest_node_id);
|
||||
Block block = _mutable_block->to_block();
|
||||
_mutable_block->set_muatable_columns(block.clone_empty_columns());
|
||||
if (recvr != nullptr) {
|
||||
Block block = _mutable_block->to_block();
|
||||
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
|
||||
COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
|
||||
recvr->add_block(&block, _parent->_sender_id, true);
|
||||
if (eos) {
|
||||
recvr->remove_sender(_parent->_sender_id, _be_number);
|
||||
}
|
||||
}
|
||||
_mutable_block->clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -121,6 +126,7 @@ Status VDataStreamSender::Channel::send_local_block(Block* block) {
|
||||
_dest_node_id);
|
||||
if (recvr != nullptr) {
|
||||
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes());
|
||||
COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
|
||||
recvr->add_block(block, _parent->_sender_id, false);
|
||||
}
|
||||
return Status::OK();
|
||||
@ -426,6 +432,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
|
||||
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
|
||||
_uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
|
||||
_ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
|
||||
_local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
|
||||
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
|
||||
_compress_timer = ADD_TIMER(profile(), "CompressTime");
|
||||
_overall_throughput = profile()->add_derived_counter(
|
||||
|
||||
@ -135,6 +135,7 @@ protected:
|
||||
RuntimeProfile::Counter* _bytes_sent_counter;
|
||||
RuntimeProfile::Counter* _uncompressed_bytes_counter;
|
||||
RuntimeProfile::Counter* _ignore_rows;
|
||||
RuntimeProfile::Counter* _local_sent_rows;
|
||||
|
||||
std::unique_ptr<MemTracker> _mem_tracker;
|
||||
|
||||
@ -302,6 +303,8 @@ private:
|
||||
PBlock* _ch_cur_pb_block = nullptr;
|
||||
PBlock _ch_pb_block1;
|
||||
PBlock _ch_pb_block2;
|
||||
|
||||
bool _enable_local_exchange = false;
|
||||
};
|
||||
|
||||
template <typename Channels, typename HashVals>
|
||||
|
||||
@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = "fragment_transmission_compression_codec";
|
||||
|
||||
public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
|
||||
|
||||
// session origin value
|
||||
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
|
||||
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
|
||||
@ -525,6 +527,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN)
|
||||
public boolean enableFunctionPushdown;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE)
|
||||
public boolean enableLocalExchange = false;
|
||||
|
||||
public String getBlockEncryptionMode() {
|
||||
return blockEncryptionMode;
|
||||
}
|
||||
@ -918,6 +923,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return this.enableFunctionPushdown;
|
||||
}
|
||||
|
||||
public boolean getEnableLocalExchange() {
|
||||
return enableLocalExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* getInsertVisibleTimeoutMs.
|
||||
**/
|
||||
@ -1113,6 +1122,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
|
||||
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
|
||||
tResult.setEnableLocalExchange(enableLocalExchange);
|
||||
|
||||
return tResult;
|
||||
}
|
||||
|
||||
@ -167,6 +167,8 @@ struct TQueryOptions {
|
||||
45: optional bool enable_function_pushdown;
|
||||
|
||||
46: optional string fragment_transmission_compression_codec;
|
||||
|
||||
47: optional bool enable_local_exchange;
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user