From 8343abaad1b16249374cde52450c1abe2a004e45 Mon Sep 17 00:00:00 2001 From: stdpain <34912776+stdpain@users.noreply.github.com> Date: Sun, 21 Mar 2021 11:25:33 +0800 Subject: [PATCH] [Feature] Local Exechange (#5470) Avoid network transmission when the data stream sender node and destination exchange node are in same BE, to improve performance and save CPU. --- be/src/exec/exchange_node.cpp | 2 +- be/src/exec/hash_join_node.cpp | 8 +-- be/src/runtime/data_stream_mgr.h | 1 + be/src/runtime/data_stream_recvr.cc | 47 +++++++++++++- be/src/runtime/data_stream_recvr.h | 10 +-- be/src/runtime/data_stream_sender.cpp | 89 ++++++++++++++++++++++++--- be/src/runtime/data_stream_sender.h | 5 +- 7 files changed, 139 insertions(+), 23 deletions(-) diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index d4314a9552..14386e8f16 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -153,7 +153,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* RETURN_IF_CANCELLED(state); // copy rows until we hit the limit/capacity or until we exhaust _input_batch while (!reached_limit() && !output_batch->at_capacity() && _input_batch != NULL && - _next_row_idx < _input_batch->capacity()) { + _next_row_idx < _input_batch->num_rows()) { TupleRow* src = _input_batch->get_row(_next_row_idx); if (ExecNode::eval_conjuncts(ctxs, num_ctxs, src)) { diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 266f08afb0..be1ee9a9be 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -193,13 +193,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) { // take ownership of tuple data of build_batch _build_pool->acquire_data(build_batch.tuple_data_pool(), false); RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); - - // Call codegen version if possible - if (_process_build_batch_fn == NULL) { - process_build_batch(&build_batch); - } else { - _process_build_batch_fn(this, &build_batch); - } + process_build_batch(&build_batch); VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc()); diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index fbe6bfa20a..7758a46043 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -88,6 +88,7 @@ public: private: friend class DataStreamRecvr; + friend class DataStreamSender; // protects all fields below boost::mutex _lock; diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index c5ef74fe5d..944c7d7dc7 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -50,6 +50,14 @@ using boost::mem_fn; namespace doris { +class ThreadClosure : public google::protobuf::Closure { +public: + void Run() { _cv.notify_one(); } + void wait(unique_lock& lock) { _cv.wait(lock); } + +private: + condition_variable _cv; +}; // Implements a blocking queue of row batches from one or more senders. One queue // is maintained per sender if _is_merging is true for the enclosing receiver, otherwise // rows from all senders are placed in the same queue. @@ -74,6 +82,8 @@ public: void add_batch(const PRowBatch& pb_batch, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done); + void add_batch(RowBatch* batch, bool use_move); + // Decrement the number of remaining senders for this queue and signal eos ("new data") // if the count drops to 0. The number of senders will be 1 for a merging // DataStreamRecvr. @@ -126,8 +136,8 @@ private: std::unordered_set _sender_eos_set; // sender_id std::unordered_map _packet_seq_map; // be_number => packet_seq - std::deque> _pending_closures; + std::unordered_map> _local_closure; }; DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, @@ -257,6 +267,36 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n _data_arrival_cv.notify_one(); } +void DataStreamRecvr::SenderQueue::add_batch(RowBatch* batch, bool use_move) { + unique_lock l(_lock); + if (_is_cancelled) { + return; + } + RowBatch* nbatch = + new RowBatch(_recvr->row_desc(), batch->capacity(), _recvr->mem_tracker().get()); + if (use_move) { + nbatch->acquire_state(batch); + } else { + batch->deep_copy_to(nbatch); + } + int batch_size = nbatch->total_byte_size(); + _batch_queue.emplace_back(batch_size, nbatch); + _data_arrival_cv.notify_one(); + if (_recvr->exceeds_limit(batch_size)) { + std::thread::id tid = std::this_thread::get_id(); + MonotonicStopWatch monotonicStopWatch; + monotonicStopWatch.start(); + auto iter = _local_closure.find(tid); + if (iter == _local_closure.end()) { + _local_closure.emplace(tid, new ThreadClosure); + iter = _local_closure.find(tid); + } + _pending_closures.emplace_back(iter->second.get(), monotonicStopWatch); + iter->second->wait(l); + } + _recvr->_num_buffered_bytes += batch_size; +} + void DataStreamRecvr::SenderQueue::decrement_senders(int be_number) { lock_guard l(_lock); if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) { @@ -443,6 +483,11 @@ void DataStreamRecvr::add_batch(const PRowBatch& batch, int sender_id, int be_nu _sender_queues[use_sender_id]->add_batch(batch, be_number, packet_seq, done); } +void DataStreamRecvr::add_batch(RowBatch* batch, int sender_id, bool use_move) { + int use_sender_id = _is_merging ? sender_id : 0; + _sender_queues[use_sender_id]->add_batch(batch, use_move); +} + void DataStreamRecvr::remove_sender(int sender_id, int be_number) { int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->decrement_senders(be_number); diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 62d38c76bd..e090a69633 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -79,6 +79,8 @@ public: // Refactor so both merging and non-merging exchange use get_next(RowBatch*, bool* eos). Status get_batch(RowBatch** next_batch); + void add_batch(RowBatch* batch, int sender_id, bool use_move); + // Deregister from DataStreamMgr instance, which shares ownership of this instance. void close(); @@ -105,6 +107,10 @@ public: _sub_plan_query_statistics_recvr->insert(statistics, sender_id); } + // Indicate that a particular sender is done. Delegated to the appropriate + // sender queue. Called from DataStreamMgr. + void remove_sender(int sender_id, int be_number); + private: friend class DataStreamMgr; class SenderQueue; @@ -119,10 +125,6 @@ private: void add_batch(const PRowBatch& batch, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done); - // Indicate that a particular sender is done. Delegated to the appropriate - // sender queue. Called from DataStreamMgr. - void remove_sender(int sender_id, int be_number); - // Empties the sender queues and notifies all waiting consumers of cancellation. void cancel_stream(); diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 7f23ffd7f1..9b9f60a68b 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -20,10 +20,12 @@ #include #include +#include #include #include #include +#include "common/config.h" #include "common/logging.h" #include "exprs/expr.h" #include "exprs/expr_context.h" @@ -32,6 +34,8 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/client_cache.h" +#include "runtime/data_stream_mgr.h" +#include "runtime/data_stream_recvr.h" #include "runtime/descriptors.h" #include "runtime/dpp_sink_internal.h" #include "runtime/exec_env.h" @@ -43,6 +47,7 @@ #include "service/backend_options.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" +#include "util/defer_op.h" #include "util/debug_util.h" #include "util/network_util.h" #include "util/ref_count_closure.h" @@ -79,7 +84,14 @@ public: _need_close(false), _brpc_dest_addr(brpc_dest), _is_transfer_chain(is_transfer_chain), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {} + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { + std::string localhost = BackendOptions::get_localhost(); + _is_local = + _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port; + if (_is_local) { + LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id; + } + } virtual ~Channel() { if (_closure != nullptr && _closure->unref()) { @@ -104,6 +116,10 @@ public: // if batch is nullptr, send the eof packet Status send_batch(PRowBatch* batch, bool eos = false); + Status send_local_batch(bool eos); + + Status send_local_batch(RowBatch* batch, bool use_move); + // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels @@ -124,8 +140,11 @@ public: TUniqueId get_fragment_instance_id() { return _fragment_instance_id; } + bool is_local() { return _is_local; } + private: inline Status _wait_last_brpc() { + if (_closure == nullptr) return Status::OK(); auto cntl = &_closure->cntl; brpc::Join(cntl->call_id()); if (cntl->Failed()) { @@ -174,6 +193,7 @@ private: // whether the dest can be treated as query statistics transfer chain. bool _is_transfer_chain; bool _send_query_statistics_with_every_batch; + bool _is_local; }; Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -198,7 +218,12 @@ Status DataStreamSender::Channel::init(RuntimeState* state) { _brpc_request.set_be_number(_be_number); _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 1000; - _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr); + if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { + _brpc_stub = + state->exec_env()->brpc_stub_cache()->get_stub("127.0.0.1", _brpc_dest_addr.port); + } else { + _brpc_stub = state->exec_env()->brpc_stub_cache()->get_stub(_brpc_dest_addr); + } // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" @@ -268,6 +293,9 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) { } Status DataStreamSender::Channel::send_current_batch(bool eos) { + if (is_local()) { + return send_local_batch(eos); + } { SCOPED_TIMER(_parent->_serialize_batch_timer); int uncompressed_bytes = _batch->serialize(&_pb_batch); @@ -279,6 +307,30 @@ Status DataStreamSender::Channel::send_current_batch(bool eos) { return Status::OK(); } +Status DataStreamSender::Channel::send_local_batch(bool eos) { + boost::shared_ptr recvr = + _parent->state()->exec_env()->stream_mgr()->find_recvr(_fragment_instance_id, + _dest_node_id); + if (recvr != nullptr) { + recvr->add_batch(_batch.get(), _parent->_sender_id, true); + if (eos) { + recvr->remove_sender(_parent->_sender_id, _be_number); + } + } + _batch->reset(); + return Status::OK(); +} + +Status DataStreamSender::Channel::send_local_batch(RowBatch* batch, bool use_move) { + boost::shared_ptr recvr = + _parent->state()->exec_env()->stream_mgr()->find_recvr(_fragment_instance_id, + _dest_node_id); + if (recvr != nullptr) { + recvr->add_batch(batch, _parent->_sender_id, use_move); + } + return Status::OK(); +} + Status DataStreamSender::Channel::close_internal() { if (!_need_close) { return Status::OK(); @@ -470,17 +522,38 @@ Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) { // Unpartition or _channel size if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { - RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch, _channels.size())); + int local_size = 0; for (auto channel : _channels) { - RETURN_IF_ERROR(channel->send_batch(_current_pb_batch)); + if (channel->is_local()) { + local_size++; + } + } + if (local_size == _channels.size()) { + // we don't have to serialize + for (auto channel : _channels) { + RETURN_IF_ERROR(channel->send_local_batch(batch, false)); + } + } else { + RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch, _channels.size())); + for (auto channel : _channels) { + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_batch(batch, false)); + } else { + RETURN_IF_ERROR(channel->send_batch(_current_pb_batch)); + } + } + _current_pb_batch = (_current_pb_batch == &_pb_batch1 ? &_pb_batch2 : &_pb_batch1); } - _current_pb_batch = (_current_pb_batch == &_pb_batch1 ? &_pb_batch2 : &_pb_batch1); } else if (_part_type == TPartitionType::RANDOM) { // Round-robin batches among channels. Wait for the current channel to finish its // rpc before overwriting its batch. Channel* current_channel = _channels[_current_channel_idx]; - RETURN_IF_ERROR(serialize_batch(batch, current_channel->pb_batch())); - RETURN_IF_ERROR(current_channel->send_batch(current_channel->pb_batch())); + if (current_channel->is_local()) { + RETURN_IF_ERROR(current_channel->send_local_batch(batch, false)); + } else { + RETURN_IF_ERROR(serialize_batch(batch, current_channel->pb_batch())); + RETURN_IF_ERROR(current_channel->send_batch(current_channel->pb_batch())); + } _current_channel_idx = (_current_channel_idx + 1) % _channels.size(); } else if (_part_type == TPartitionType::HASH_PARTITIONED) { // hash-partition batch's rows across channels @@ -640,6 +713,8 @@ Status DataStreamSender::compute_range_part_code(RuntimeState* state, TupleRow* Status DataStreamSender::close(RuntimeState* state, Status exec_status) { // TODO: only close channels that didn't have any errors // make all channels close parallel + if (_closed) return Status::OK(); + _closed = true; Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h index 675ef23e81..7faf29b858 100644 --- a/be/src/runtime/data_stream_sender.h +++ b/be/src/runtime/data_stream_sender.h @@ -95,6 +95,8 @@ public: virtual RuntimeProfile* profile() { return _profile; } + RuntimeState* state() { return _state; } + private: class Channel; @@ -117,9 +119,6 @@ private: int _current_channel_idx; // index of current channel to send to if _random == true - // If true, this sender has been closed. Not valid to call Send() anymore. - bool _closed; - TPartitionType::type _part_type; bool _ignore_not_found;