From abc9de07b3c953ae2be45635f089b8705d595b63 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 13 Aug 2023 13:20:48 +0800 Subject: [PATCH] [Bug](pipeline) make sure sink is not blocked before try close (#22765) make sure sink is not blocked before try close --- be/src/pipeline/exec/operator.h | 13 +- be/src/vec/sink/vdata_stream_sender.cpp | 233 ++++++++++++------------ be/src/vec/sink/vdata_stream_sender.h | 28 +-- 3 files changed, 137 insertions(+), 137 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index acf55cb7bc..55335c093a 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -273,15 +273,12 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { - if (in_block->rows() > 0) { - auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is()) { - return Status::OK(); - } - return st; + auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); + // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished + if (st.template is()) { + return Status::OK(); } - return Status::OK(); + return st; } Status try_close(RuntimeState* state) override { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e8f2d6dbe8..9f7c34a6c3 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -111,7 +111,7 @@ Status Channel::send_current_block(bool eos) { } SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (eos) { - RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1, true)); + RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); } RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); ch_roll_pb_block(); @@ -196,14 +196,14 @@ Status Channel::send_block(PBlock* block, bool eos) { return Status::OK(); } -Status Channel::add_rows(Block* block, const std::vector& rows) { +Status Channel::add_rows(Block* block, const std::vector& rows, bool eos) { if (_fragment_instance_id.lo == -1) { return Status::OK(); } bool serialized = false; - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, - &rows, true)); + RETURN_IF_ERROR( + _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); if (serialized) { RETURN_IF_ERROR(send_current_block(false)); } @@ -493,52 +493,72 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { -#ifndef BROADCAST_ALL_CHANNELS -#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \ - { \ - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ - bool serialized = false; \ - RETURN_IF_ERROR(_serializer.next_serialized_block(block, PBLOCK, _channels.size(), \ - &serialized, nullptr, false)); \ - if (serialized) { \ - Status status; \ - Block merged_block = _serializer.get_block()->to_block(); \ - for (auto channel : _channels) { \ - if (!channel->is_receiver_eof()) { \ - if (channel->is_local()) { \ - status = channel->send_local_block(&merged_block); \ - } else { \ - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ - status = channel->send_block(PBLOCK_TO_SEND, false); \ - } \ - HANDLE_CHANNEL_STATUS(state, channel, status); \ - } \ - } \ - merged_block.clear_column_data(); \ - _serializer.get_block()->set_muatable_columns(merged_block.mutate_columns()); \ - POST_PROCESS; \ - } \ - } -#endif // 1. serialize depends on it is not local exchange // 2. send block // 3. rollover block if (_only_local_exchange) { - Status status; - for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); - HANDLE_CHANNEL_STATUS(state, channel, status); + if (!block->empty()) { + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + status = channel->send_local_block(block); + HANDLE_CHANNEL_STATUS(state, channel, status); + } } } } else if (_enable_pipeline_exec) { BroadcastPBlockHolder* block_holder = nullptr; RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); - BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, ); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + bool serialized = false; + RETURN_IF_ERROR(_serializer.next_serialized_block( + block, block_holder->get_block(), _channels.size(), &serialized, eos)); + if (serialized) { + auto cur_block = _serializer.get_block()->to_block(); + if (!cur_block.empty()) { + _serializer.serialize_block(&cur_block, block_holder->get_block(), + _channels.size()); + } else { + block_holder->get_block()->Clear(); + } + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + if (channel->is_local()) { + status = channel->send_local_block(block); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + status = channel->send_block(block_holder, eos); + } + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + cur_block.clear_column_data(); + _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); + } + } } else { - BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block, _roll_pb_block()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + bool serialized = false; + RETURN_IF_ERROR(_serializer.next_serialized_block( + block, _cur_pb_block, _channels.size(), &serialized, false)); + if (serialized) { + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + if (channel->is_local()) { + status = channel->send_local_block(block); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + status = channel->send_block(_cur_pb_block, false); + } + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + _roll_pb_block(); + } } -#undef BROADCAST_ALL_CHANNELS } else if (_part_type == TPartitionType::RANDOM) { // 1. select channel Channel* current_channel = _channels[_current_channel_idx]; @@ -550,7 +570,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR( - _serializer.serialize_block(block, current_channel->ch_cur_pb_block(), 1)); + _serializer.serialize_block(block, current_channel->ch_cur_pb_block())); auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); @@ -565,10 +585,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { int result_size = _partition_expr_ctxs.size(); int result[result_size]; - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - RETURN_IF_ERROR(get_partition_column_result(block, result)); - } // vectorized calculate hash int rows = block->rows(); @@ -576,44 +592,53 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { std::vector hash_vals(rows); auto* __restrict hashes = hash_vals.data(); - // TODO: after we support new shuffle hash method, should simple the code + if (rows > 0) { + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(get_partition_column_result(block, result)); + } + // TODO: after we support new shuffle hash method, should simple the code + if (_part_type == TPartitionType::HASH_PARTITIONED) { + SCOPED_TIMER(_split_block_hash_compute_timer); + // result[j] means column index, i means rows index, here to calculate the xxhash value + for (int j = 0; j < result_size; ++j) { + // complex type most not implement get_data_at() method which column_const will call + unpack_if_const(block->get_by_position(result[j]).column) + .first->update_hashes_with_value(hashes); + } + + for (int i = 0; i < rows; i++) { + hashes[i] = hashes[i] % element_size; + } + + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Block::erase_useless_column(block, column_to_keep); + } + } else { + for (int j = 0; j < result_size; ++j) { + // complex type most not implement get_data_at() method which column_const will call + unpack_if_const(block->get_by_position(result[j]).column) + .first->update_crcs_with_value( + hash_vals, _partition_expr_ctxs[j]->root()->type().type); + } + element_size = _channel_shared_ptrs.size(); + for (int i = 0; i < rows; i++) { + hashes[i] = hashes[i] % element_size; + } + + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + Block::erase_useless_column(block, column_to_keep); + } + } + } if (_part_type == TPartitionType::HASH_PARTITIONED) { - SCOPED_TIMER(_split_block_hash_compute_timer); - // result[j] means column index, i means rows index, here to calculate the xxhash value - for (int j = 0; j < result_size; ++j) { - // complex type most not implement get_data_at() method which column_const will call - unpack_if_const(block->get_by_position(result[j]).column) - .first->update_hashes_with_value(hashes); - } - - for (int i = 0; i < rows; i++) { - hashes[i] = hashes[i] % element_size; - } - - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Block::erase_useless_column(block, column_to_keep); - } - - RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block)); + RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block, + _enable_pipeline_exec ? eos : false)); } else { - for (int j = 0; j < result_size; ++j) { - // complex type most not implement get_data_at() method which column_const will call - unpack_if_const(block->get_by_position(result[j]).column) - .first->update_crcs_with_value( - hash_vals, _partition_expr_ctxs[j]->root()->type().type); - } - element_size = _channel_shared_ptrs.size(); - for (int i = 0; i < rows; i++) { - hashes[i] = hashes[i] % element_size; - } - - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Block::erase_useless_column(block, column_to_keep); - } RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, element_size, hashes, - rows, block)); + rows, block, _enable_pipeline_exec ? eos : false)); } } else { // Range partition @@ -624,28 +649,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { - if (_serializer.get_block() && _serializer.get_block()->rows() > 0) { - BroadcastPBlockHolder* block_holder = nullptr; - RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Block block = _serializer.get_block()->to_block(); - RETURN_IF_ERROR(_serializer.serialize_block(&block, block_holder->get_block(), - _channels.size())); - Status status; - for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - if (channel->is_local()) { - status = channel->send_local_block(&block); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, false); - } - HANDLE_CHANNEL_STATUS(state, channel, status); - } - } - } - } + DCHECK(_serializer.get_block() == nullptr || _serializer.get_block()->rows() == 0); Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); @@ -710,8 +714,8 @@ BlockSerializer::BlockSerializer(VDataStreamSender* parent, bool is_local) : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers, - bool* serialized, const std::vector* rows, - bool clear_after_serialize) { + bool* serialized, bool eos, + const std::vector* rows) { if (_mutable_block == nullptr) { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); _mutable_block = MutableBlock::create_unique(block->clone_empty()); @@ -720,18 +724,20 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (rows) { - SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer); - const int* begin = &(*rows)[0]; - _mutable_block->add_rows(block, begin, begin + rows->size()); - } else { + if (rows->size() > 0) { + SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer); + const int* begin = &(*rows)[0]; + _mutable_block->add_rows(block, begin, begin + rows->size()); + } + } else if (!block->empty()) { SCOPED_TIMER(_parent->_merge_block_timer); RETURN_IF_ERROR(_mutable_block->merge(*block)); } } - if (_mutable_block->rows() >= _batch_size) { + if (_mutable_block->rows() >= _batch_size || eos) { if (!_is_local) { - RETURN_IF_ERROR(serialize_block(dest, num_receivers, clear_after_serialize)); + RETURN_IF_ERROR(serialize_block(dest, num_receivers)); } *serialized = true; return Status::OK(); @@ -740,21 +746,18 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu return Status::OK(); } -Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers, - bool clear_after_serialize) { +Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { if (_mutable_block && _mutable_block->rows() > 0) { auto block = _mutable_block->to_block(); RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); - if (clear_after_serialize) { - block.clear_column_data(); - } + block.clear_column_data(); _mutable_block->set_muatable_columns(block.mutate_columns()); } return Status::OK(); } -Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) { +Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int num_receivers) { { SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 2649e077d3..503a73798e 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -71,11 +71,11 @@ class VDataStreamSender; class BlockSerializer { public: - BlockSerializer(VDataStreamSender* parent, bool is_local = false); + BlockSerializer(VDataStreamSender* parent, bool is_local = true); Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, - const std::vector* rows, bool clear_after_serialize); - Status serialize_block(PBlock* dest, int num_receivers, bool clear_after_serialize); - Status serialize_block(const Block* src, PBlock* dest, int num_receivers); + bool eos, const std::vector* rows = nullptr); + Status serialize_block(PBlock* dest, int num_receivers = 1); + Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); MutableBlock* get_block() const { return _mutable_block.get(); } @@ -141,7 +141,7 @@ protected: template Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, - const uint64_t* channel_ids, int rows, Block* block); + const uint64_t* channel_ids, int rows, Block* block, bool eos); template void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); @@ -264,7 +264,7 @@ public: return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } - virtual Status add_rows(Block* block, const std::vector& row); + virtual Status add_rows(Block* block, const std::vector& row, bool eos); virtual Status send_current_block(bool eos); @@ -397,7 +397,7 @@ protected: template Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, const uint64_t* __restrict channel_ids, - int rows, Block* block) { + int rows, Block* block, bool eos) { std::vector channel2rows[num_channels]; for (int i = 0; i < rows; i++) { @@ -406,8 +406,8 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe Status status; for (int i = 0; i < num_channels; ++i) { - if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { - status = channels[i]->add_rows(block, channel2rows[i]); + if (!channels[i]->is_receiver_eof() && (!channel2rows[i].empty() || eos)) { + status = channels[i]->add_rows(block, channel2rows[i], eos); HANDLE_CHANNEL_STATUS(state, channels[i], status); } } @@ -479,17 +479,17 @@ public: return Status::OK(); } - Status add_rows(Block* block, const std::vector& rows) override { + Status add_rows(Block* block, const std::vector& rows, bool eos) override { if (_fragment_instance_id.lo == -1) { return Status::OK(); } bool serialized = false; _pblock = std::make_unique(); - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, - &rows, true)); + RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, + &rows)); if (serialized) { - RETURN_IF_ERROR(send_current_block(false)); + RETURN_IF_ERROR(send_current_block(eos)); } return Status::OK(); @@ -503,7 +503,7 @@ public: SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (eos) { _pblock = std::make_unique(); - RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1, true)); + RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1)); } RETURN_IF_ERROR(send_block(_pblock.release(), eos)); return Status::OK();