[Bug](pipeline) make sure sink is not blocked before try close (#22765)
make sure sink is not blocked before try close
This commit is contained in:
@ -273,15 +273,12 @@ public:
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override {
|
||||
if (in_block->rows() > 0) {
|
||||
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
|
||||
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
|
||||
if (st.template is<ErrorCode::END_OF_FILE>()) {
|
||||
return Status::OK();
|
||||
}
|
||||
return st;
|
||||
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
|
||||
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
|
||||
if (st.template is<ErrorCode::END_OF_FILE>()) {
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::OK();
|
||||
return st;
|
||||
}
|
||||
|
||||
Status try_close(RuntimeState* state) override {
|
||||
|
||||
@ -111,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
|
||||
}
|
||||
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
|
||||
if (eos) {
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1, true));
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
|
||||
}
|
||||
RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
|
||||
ch_roll_pb_block();
|
||||
@ -196,14 +196,14 @@ Status Channel::send_block(PBlock* block, bool eos) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
|
||||
Status Channel::add_rows(Block* block, const std::vector<int>& rows, bool eos) {
|
||||
if (_fragment_instance_id.lo == -1) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool serialized = false;
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized,
|
||||
&rows, true));
|
||||
RETURN_IF_ERROR(
|
||||
_serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows));
|
||||
if (serialized) {
|
||||
RETURN_IF_ERROR(send_current_block(false));
|
||||
}
|
||||
@ -493,52 +493,72 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
}
|
||||
|
||||
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
|
||||
#ifndef BROADCAST_ALL_CHANNELS
|
||||
#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \
|
||||
{ \
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
|
||||
bool serialized = false; \
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(block, PBLOCK, _channels.size(), \
|
||||
&serialized, nullptr, false)); \
|
||||
if (serialized) { \
|
||||
Status status; \
|
||||
Block merged_block = _serializer.get_block()->to_block(); \
|
||||
for (auto channel : _channels) { \
|
||||
if (!channel->is_receiver_eof()) { \
|
||||
if (channel->is_local()) { \
|
||||
status = channel->send_local_block(&merged_block); \
|
||||
} else { \
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
|
||||
status = channel->send_block(PBLOCK_TO_SEND, false); \
|
||||
} \
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status); \
|
||||
} \
|
||||
} \
|
||||
merged_block.clear_column_data(); \
|
||||
_serializer.get_block()->set_muatable_columns(merged_block.mutate_columns()); \
|
||||
POST_PROCESS; \
|
||||
} \
|
||||
}
|
||||
#endif
|
||||
// 1. serialize depends on it is not local exchange
|
||||
// 2. send block
|
||||
// 3. rollover block
|
||||
if (_only_local_exchange) {
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
status = channel->send_local_block(block);
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
if (!block->empty()) {
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
status = channel->send_local_block(block);
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (_enable_pipeline_exec) {
|
||||
BroadcastPBlockHolder* block_holder = nullptr;
|
||||
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
|
||||
BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, );
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
bool serialized = false;
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(
|
||||
block, block_holder->get_block(), _channels.size(), &serialized, eos));
|
||||
if (serialized) {
|
||||
auto cur_block = _serializer.get_block()->to_block();
|
||||
if (!cur_block.empty()) {
|
||||
_serializer.serialize_block(&cur_block, block_holder->get_block(),
|
||||
_channels.size());
|
||||
} else {
|
||||
block_holder->get_block()->Clear();
|
||||
}
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(block);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_block(block_holder, eos);
|
||||
}
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
cur_block.clear_column_data();
|
||||
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block, _roll_pb_block());
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
bool serialized = false;
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(
|
||||
block, _cur_pb_block, _channels.size(), &serialized, false));
|
||||
if (serialized) {
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(block);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_block(_cur_pb_block, false);
|
||||
}
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
_roll_pb_block();
|
||||
}
|
||||
}
|
||||
#undef BROADCAST_ALL_CHANNELS
|
||||
} else if (_part_type == TPartitionType::RANDOM) {
|
||||
// 1. select channel
|
||||
Channel* current_channel = _channels[_current_channel_idx];
|
||||
@ -550,7 +570,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
RETURN_IF_ERROR(
|
||||
_serializer.serialize_block(block, current_channel->ch_cur_pb_block(), 1));
|
||||
_serializer.serialize_block(block, current_channel->ch_cur_pb_block()));
|
||||
auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
|
||||
HANDLE_CHANNEL_STATUS(state, current_channel, status);
|
||||
current_channel->ch_roll_pb_block();
|
||||
@ -565,10 +585,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
|
||||
int result_size = _partition_expr_ctxs.size();
|
||||
int result[result_size];
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
RETURN_IF_ERROR(get_partition_column_result(block, result));
|
||||
}
|
||||
|
||||
// vectorized calculate hash
|
||||
int rows = block->rows();
|
||||
@ -576,44 +592,53 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
std::vector<uint64_t> hash_vals(rows);
|
||||
auto* __restrict hashes = hash_vals.data();
|
||||
|
||||
// TODO: after we support new shuffle hash method, should simple the code
|
||||
if (rows > 0) {
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
RETURN_IF_ERROR(get_partition_column_result(block, result));
|
||||
}
|
||||
// TODO: after we support new shuffle hash method, should simple the code
|
||||
if (_part_type == TPartitionType::HASH_PARTITIONED) {
|
||||
SCOPED_TIMER(_split_block_hash_compute_timer);
|
||||
// result[j] means column index, i means rows index, here to calculate the xxhash value
|
||||
for (int j = 0; j < result_size; ++j) {
|
||||
// complex type most not implement get_data_at() method which column_const will call
|
||||
unpack_if_const(block->get_by_position(result[j]).column)
|
||||
.first->update_hashes_with_value(hashes);
|
||||
}
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
hashes[i] = hashes[i] % element_size;
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
}
|
||||
} else {
|
||||
for (int j = 0; j < result_size; ++j) {
|
||||
// complex type most not implement get_data_at() method which column_const will call
|
||||
unpack_if_const(block->get_by_position(result[j]).column)
|
||||
.first->update_crcs_with_value(
|
||||
hash_vals, _partition_expr_ctxs[j]->root()->type().type);
|
||||
}
|
||||
element_size = _channel_shared_ptrs.size();
|
||||
for (int i = 0; i < rows; i++) {
|
||||
hashes[i] = hashes[i] % element_size;
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (_part_type == TPartitionType::HASH_PARTITIONED) {
|
||||
SCOPED_TIMER(_split_block_hash_compute_timer);
|
||||
// result[j] means column index, i means rows index, here to calculate the xxhash value
|
||||
for (int j = 0; j < result_size; ++j) {
|
||||
// complex type most not implement get_data_at() method which column_const will call
|
||||
unpack_if_const(block->get_by_position(result[j]).column)
|
||||
.first->update_hashes_with_value(hashes);
|
||||
}
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
hashes[i] = hashes[i] % element_size;
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block));
|
||||
RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block,
|
||||
_enable_pipeline_exec ? eos : false));
|
||||
} else {
|
||||
for (int j = 0; j < result_size; ++j) {
|
||||
// complex type most not implement get_data_at() method which column_const will call
|
||||
unpack_if_const(block->get_by_position(result[j]).column)
|
||||
.first->update_crcs_with_value(
|
||||
hash_vals, _partition_expr_ctxs[j]->root()->type().type);
|
||||
}
|
||||
element_size = _channel_shared_ptrs.size();
|
||||
for (int i = 0; i < rows; i++) {
|
||||
hashes[i] = hashes[i] % element_size;
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
}
|
||||
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, element_size, hashes,
|
||||
rows, block));
|
||||
rows, block, _enable_pipeline_exec ? eos : false));
|
||||
}
|
||||
} else {
|
||||
// Range partition
|
||||
@ -624,28 +649,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
}
|
||||
|
||||
Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
|
||||
if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
|
||||
BroadcastPBlockHolder* block_holder = nullptr;
|
||||
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
Block block = _serializer.get_block()->to_block();
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(&block, block_holder->get_block(),
|
||||
_channels.size()));
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&block);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_block(block_holder, false);
|
||||
}
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
DCHECK(_serializer.get_block() == nullptr || _serializer.get_block()->rows() == 0);
|
||||
Status final_st = Status::OK();
|
||||
for (int i = 0; i < _channels.size(); ++i) {
|
||||
Status st = _channels[i]->close(state);
|
||||
@ -710,8 +714,8 @@ BlockSerializer::BlockSerializer(VDataStreamSender* parent, bool is_local)
|
||||
: _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {}
|
||||
|
||||
Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers,
|
||||
bool* serialized, const std::vector<int>* rows,
|
||||
bool clear_after_serialize) {
|
||||
bool* serialized, bool eos,
|
||||
const std::vector<int>* rows) {
|
||||
if (_mutable_block == nullptr) {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
|
||||
_mutable_block = MutableBlock::create_unique(block->clone_empty());
|
||||
@ -720,18 +724,20 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu
|
||||
{
|
||||
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
|
||||
if (rows) {
|
||||
SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
|
||||
const int* begin = &(*rows)[0];
|
||||
_mutable_block->add_rows(block, begin, begin + rows->size());
|
||||
} else {
|
||||
if (rows->size() > 0) {
|
||||
SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
|
||||
const int* begin = &(*rows)[0];
|
||||
_mutable_block->add_rows(block, begin, begin + rows->size());
|
||||
}
|
||||
} else if (!block->empty()) {
|
||||
SCOPED_TIMER(_parent->_merge_block_timer);
|
||||
RETURN_IF_ERROR(_mutable_block->merge(*block));
|
||||
}
|
||||
}
|
||||
|
||||
if (_mutable_block->rows() >= _batch_size) {
|
||||
if (_mutable_block->rows() >= _batch_size || eos) {
|
||||
if (!_is_local) {
|
||||
RETURN_IF_ERROR(serialize_block(dest, num_receivers, clear_after_serialize));
|
||||
RETURN_IF_ERROR(serialize_block(dest, num_receivers));
|
||||
}
|
||||
*serialized = true;
|
||||
return Status::OK();
|
||||
@ -740,21 +746,18 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers,
|
||||
bool clear_after_serialize) {
|
||||
Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
|
||||
if (_mutable_block && _mutable_block->rows() > 0) {
|
||||
auto block = _mutable_block->to_block();
|
||||
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
|
||||
if (clear_after_serialize) {
|
||||
block.clear_column_data();
|
||||
}
|
||||
block.clear_column_data();
|
||||
_mutable_block->set_muatable_columns(block.mutate_columns());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) {
|
||||
Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int num_receivers) {
|
||||
{
|
||||
SCOPED_TIMER(_parent->_serialize_batch_timer);
|
||||
dest->Clear();
|
||||
|
||||
@ -71,11 +71,11 @@ class VDataStreamSender;
|
||||
|
||||
class BlockSerializer {
|
||||
public:
|
||||
BlockSerializer(VDataStreamSender* parent, bool is_local = false);
|
||||
BlockSerializer(VDataStreamSender* parent, bool is_local = true);
|
||||
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized,
|
||||
const std::vector<int>* rows, bool clear_after_serialize);
|
||||
Status serialize_block(PBlock* dest, int num_receivers, bool clear_after_serialize);
|
||||
Status serialize_block(const Block* src, PBlock* dest, int num_receivers);
|
||||
bool eos, const std::vector<int>* rows = nullptr);
|
||||
Status serialize_block(PBlock* dest, int num_receivers = 1);
|
||||
Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
|
||||
|
||||
MutableBlock* get_block() const { return _mutable_block.get(); }
|
||||
|
||||
@ -141,7 +141,7 @@ protected:
|
||||
|
||||
template <typename Channels>
|
||||
Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels,
|
||||
const uint64_t* channel_ids, int rows, Block* block);
|
||||
const uint64_t* channel_ids, int rows, Block* block, bool eos);
|
||||
|
||||
template <typename ChannelPtrType>
|
||||
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);
|
||||
@ -264,7 +264,7 @@ public:
|
||||
return Status::InternalError("Send BroadcastPBlockHolder is not allowed!");
|
||||
}
|
||||
|
||||
virtual Status add_rows(Block* block, const std::vector<int>& row);
|
||||
virtual Status add_rows(Block* block, const std::vector<int>& row, bool eos);
|
||||
|
||||
virtual Status send_current_block(bool eos);
|
||||
|
||||
@ -397,7 +397,7 @@ protected:
|
||||
template <typename Channels>
|
||||
Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channels,
|
||||
int num_channels, const uint64_t* __restrict channel_ids,
|
||||
int rows, Block* block) {
|
||||
int rows, Block* block, bool eos) {
|
||||
std::vector<int> channel2rows[num_channels];
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
@ -406,8 +406,8 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe
|
||||
|
||||
Status status;
|
||||
for (int i = 0; i < num_channels; ++i) {
|
||||
if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
|
||||
status = channels[i]->add_rows(block, channel2rows[i]);
|
||||
if (!channels[i]->is_receiver_eof() && (!channel2rows[i].empty() || eos)) {
|
||||
status = channels[i]->add_rows(block, channel2rows[i], eos);
|
||||
HANDLE_CHANNEL_STATUS(state, channels[i], status);
|
||||
}
|
||||
}
|
||||
@ -479,17 +479,17 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status add_rows(Block* block, const std::vector<int>& rows) override {
|
||||
Status add_rows(Block* block, const std::vector<int>& rows, bool eos) override {
|
||||
if (_fragment_instance_id.lo == -1) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool serialized = false;
|
||||
_pblock = std::make_unique<PBlock>();
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized,
|
||||
&rows, true));
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos,
|
||||
&rows));
|
||||
if (serialized) {
|
||||
RETURN_IF_ERROR(send_current_block(false));
|
||||
RETURN_IF_ERROR(send_current_block(eos));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -503,7 +503,7 @@ public:
|
||||
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
|
||||
if (eos) {
|
||||
_pblock = std::make_unique<PBlock>();
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1, true));
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
|
||||
}
|
||||
RETURN_IF_ERROR(send_block(_pblock.release(), eos));
|
||||
return Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user