## Proposed changes BP #35886
This commit is contained in:
@ -96,7 +96,7 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
|
||||
if (!_output_expr_contexts.empty()) {
|
||||
output_block = &tmp_block;
|
||||
}
|
||||
_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
|
||||
RETURN_IF_ERROR(_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos));
|
||||
|
||||
if (!_conjuncts.empty()) {
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
|
||||
@ -185,7 +185,8 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
|
||||
if (!local_state._output_expr_contexts.empty()) {
|
||||
output_block = &tmp_block;
|
||||
}
|
||||
local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id, output_block, eos);
|
||||
RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id,
|
||||
output_block, eos));
|
||||
|
||||
if (!local_state._conjuncts.empty()) {
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
|
||||
|
||||
@ -29,7 +29,7 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t
|
||||
block->clear();
|
||||
}
|
||||
|
||||
void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
|
||||
Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
|
||||
std::lock_guard l(_mutex);
|
||||
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
|
||||
if (pos_to_pull != _multi_cast_blocks.end()) {
|
||||
@ -42,7 +42,7 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block
|
||||
_multi_cast_blocks.pop_front();
|
||||
} else {
|
||||
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
|
||||
(void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
|
||||
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
|
||||
pos_to_pull->_used_count--;
|
||||
pos_to_pull++;
|
||||
}
|
||||
@ -51,6 +51,7 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block
|
||||
if (pos_to_pull == _multi_cast_blocks.end()) {
|
||||
_block_reading(sender_idx);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void MultiCastDataStreamer::close_sender(int sender_idx) {
|
||||
|
||||
@ -50,7 +50,7 @@ public:
|
||||
|
||||
~MultiCastDataStreamer() = default;
|
||||
|
||||
void pull(int sender_idx, vectorized::Block* block, bool* eos);
|
||||
Status pull(int sender_idx, vectorized::Block* block, bool* eos);
|
||||
|
||||
void close_sender(int sender_idx);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user