diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 30c4d9d77a..55b301c074 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -788,8 +788,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) { return Status::OK(); } -Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { +Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -799,7 +798,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_ RETURN_IF_ERROR(local_state.try_spill_disk()); local_state._executor->update_memusage(&local_state); } - if (source_state == SourceState::FINISHED) { + if (eos) { if (local_state._shared_state->spill_context.has_data) { static_cast(local_state.try_spill_disk(true)); RETURN_IF_ERROR(local_state._shared_state->spill_context.prepare_for_reading()); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 03f8dea22a..8077d78d99 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -334,8 +334,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 5d27c2d33a..15ff10ba3d 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -86,17 +86,16 @@ Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) { } Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state) { + vectorized::Block* block, bool* eos) { if (_shared_state->spill_context.has_data) { - return _serialize_with_serialized_key_result_with_spilt_data(state, block, source_state); + return _serialize_with_serialized_key_result_with_spilt_data(state, block, eos); } else { - return _serialize_with_serialized_key_result_non_spill(state, block, source_state); + return _serialize_with_serialized_key_result_non_spill(state, block, eos); } } Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { + RuntimeState* state, vectorized::Block* block, bool* eos) { CHECK(!_shared_state->spill_context.stream_ids.empty()); CHECK(_shared_state->spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; @@ -112,14 +111,12 @@ Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data( _shared_state->aggregate_data_container->init_once(); } - RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, source_state)); - if (source_state == SourceState::FINISHED) { - source_state = _shared_state->spill_context.read_cursor == - _shared_state->spill_partition_helper->partition_count - ? SourceState::FINISHED - : SourceState::DEPEND_ON_SOURCE; + RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, eos)); + if (*eos) { + *eos = _shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count; } - CHECK(!block->empty() || source_state == SourceState::FINISHED); + CHECK(!block->empty() || *eos); return Status::OK(); } @@ -153,7 +150,7 @@ Status AggLocalState::_reset_hash_table() { Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { SCOPED_TIMER(_serialize_result_timer); auto& shared_state = *_shared_state; int key_size = _shared_state->probe_expr_ctxs.size(); @@ -218,10 +215,10 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta agg_method.hash_table->template get_null_key_data< vectorized::AggregateDataPtr>(); ++num_rows; - source_state = SourceState::FINISHED; + *eos = true; } } else { - source_state = SourceState::FINISHED; + *eos = true; } } @@ -265,16 +262,16 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta } Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { if (_shared_state->spill_context.has_data) { - return _get_result_with_spilt_data(state, block, source_state); + return _get_result_with_spilt_data(state, block, eos); } else { - return _get_result_with_serialized_key_non_spill(state, block, source_state); + return _get_result_with_serialized_key_non_spill(state, block, eos); } } Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { CHECK(!_shared_state->spill_context.stream_ids.empty()); CHECK(_shared_state->spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; @@ -290,14 +287,12 @@ Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, vectorize _shared_state->aggregate_data_container->init_once(); } - RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, source_state)); - if (source_state == SourceState::FINISHED) { - source_state = _shared_state->spill_context.read_cursor == - _shared_state->spill_partition_helper->partition_count - ? SourceState::FINISHED - : SourceState::DEPEND_ON_SOURCE; + RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, eos)); + if (*eos) { + *eos = _shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count; } - CHECK(!block->empty() || source_state == SourceState::FINISHED); + CHECK(!block->empty() || *eos); return Status::OK(); } @@ -324,7 +319,7 @@ Status AggLocalState::_merge_spilt_data() { Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& shared_state = *_shared_state; // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. bool mem_reuse = shared_state.make_nullable_keys.empty() && block->mem_reuse(); @@ -402,10 +397,10 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st shared_state.aggregate_evaluators[i]->insert_result_info( mapped + shared_state.offsets_of_aggregate_states[i], value_columns[i].get()); - source_state = SourceState::FINISHED; + *eos = true; } } else { - source_state = SourceState::FINISHED; + *eos = true; } } }, @@ -428,14 +423,14 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st } Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& shared_state = *_shared_state; // 1. `child(0)->rows_returned() == 0` mean not data from child // in level two aggregation node should return NULL result // level one aggregation node set `eos = true` return directly SCOPED_TIMER(_serialize_result_timer); if (UNLIKELY(_shared_state->input_num_rows == 0)) { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } block->clear(); @@ -468,12 +463,12 @@ Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Bl } block->set_columns(std::move(value_columns)); - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& shared_state = *_shared_state; DCHECK(_agg_data->without_key != nullptr); block->clear(); @@ -521,7 +516,7 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B } block->set_columns(std::move(columns)); - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } @@ -531,15 +526,14 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, _needs_finalize(tnode.agg_node.need_finalize), _without_key(tnode.agg_node.grouping_exprs.empty()) {} -Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._executor.get_result(state, block, source_state)); + RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 08265dd881..077ac8f7db 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -65,25 +65,21 @@ public: protected: friend class AggSourceOperatorX; - Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); - Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, bool* eos); + Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, bool* eos); Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); Status _get_result_with_serialized_key_non_spill(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); - Status _get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); + Status _get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, bool* eos); Status _serialize_with_serialized_key_result_non_spill(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state); + vectorized::Block* block, bool* eos); Status _serialize_with_serialized_key_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); Status _destroy_agg_status(vectorized::AggregateDataPtr data); Status _reset_hash_table(); Status _merge_spilt_data(); @@ -105,8 +101,8 @@ protected: RuntimeProfile::Counter* _serialize_data_timer = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; - using vectorized_get_result = std::function; + using vectorized_get_result = + std::function; struct executor { vectorized_get_result get_result; @@ -124,8 +120,7 @@ public: const DescriptorTbl& descs); ~AggSourceOperatorX() = default; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 0c00ab36d0..2468f956ac 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -255,11 +255,11 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) { } Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) { + bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - local_state._shared_state->input_eos = source_state == SourceState::FINISHED; + local_state._shared_state->input_eos = eos; if (local_state._shared_state->input_eos && input_block->rows() == 0) { local_state._dependency->set_ready_to_read(); local_state._dependency->block(); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 587fc2c990..e04259a0fc 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -96,8 +96,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_partition_by_eq_expr_ctxs.empty()) { return {ExchangeType::PASSTHROUGH}; diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 49f03fd493..2f0f827d08 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -516,13 +516,13 @@ Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state } Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._shared_state->input_eos && (local_state._output_block_index == local_state._shared_state->input_blocks.size() || local_state._shared_state->input_total_rows == 0)) { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } @@ -548,7 +548,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block RETURN_IF_ERROR(local_state.output_current_block(block)); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index b13ae6aab2..e98a50186e 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -135,8 +135,7 @@ public: AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index d08f60cce7..c1db78e59b 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -38,7 +38,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode } Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); local_state.add_num_rows_returned(block->rows()); diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 278bc5ab45..f3495b5bf9 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -53,7 +53,7 @@ public: AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); - Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; + Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return false; } diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 418068ef9d..6c1310acf5 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -74,19 +74,16 @@ Status DataGenSourceOperatorX::prepare(RuntimeState* state) { return Status::OK(); } -Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { if (state == nullptr || block == nullptr) { return Status::InternalError("input is NULL pointer"); } RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); - bool eos = false; - Status res = local_state._table_func->get_next(state, block, &eos); - source_state = eos ? SourceState::FINISHED : source_state; + Status res = local_state._table_func->get_next(state, block, eos); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return res; } diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index f34dfc61df..af8eda179d 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -73,8 +73,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 66e2c333ac..0ab0b81009 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -39,7 +39,6 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta _agg_data(std::make_unique()), _agg_profile_arena(std::make_unique()), _child_block(vectorized::Block::create_unique()), - _child_source_state(SourceState::DEPEND_ON_SOURCE), _aggregated_block(vectorized::Block::create_unique()) {} Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -331,7 +330,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { } Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) const { + bool eos) const { auto& local_state = get_local_state(state); local_state._input_num_rows += in_block->rows(); Status ret = Status::OK(); @@ -351,8 +350,7 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc } // reach limit or source finish - if ((UNLIKELY(source_state == SourceState::FINISHED)) || - (_limit != -1 && local_state._output_distinct_rows >= _limit)) { + if ((UNLIKELY(eos)) || (_limit != -1 && local_state._output_distinct_rows >= _limit)) { local_state._output_distinct_rows += local_state._aggregated_block->rows(); return Status::OK(); // need given finish signal } @@ -360,7 +358,7 @@ Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Bloc } Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const { + bool* eos) const { auto& local_state = get_local_state(state); if (!local_state._aggregated_block->empty()) { block->swap(*local_state._aggregated_block); @@ -374,19 +372,13 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); } - if (UNLIKELY(local_state._child_source_state == SourceState::FINISHED || - (_limit != -1 && local_state._output_distinct_rows >= _limit))) { - source_state = SourceState::FINISHED; - } else { - source_state = SourceState::DEPEND_ON_SOURCE; - } + *eos = local_state._child_eos || (_limit != -1 && local_state._output_distinct_rows >= _limit); return Status::OK(); } bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) const { auto& local_state = get_local_state(state); - return local_state._aggregated_block->empty() && - local_state._child_source_state != SourceState::FINISHED && + return local_state._aggregated_block->empty() && !local_state._child_eos && (_limit == -1 || local_state._output_distinct_rows < _limit); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index b35c3ed7e1..41f8a74b65 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -71,7 +71,7 @@ private: vectorized::VExprContextSPtrs _probe_expr_ctxs; std::unique_ptr _agg_profile_arena = nullptr; std::unique_ptr _child_block = nullptr; - SourceState _child_source_state; + bool _child_eos = false; std::unique_ptr _aggregated_block = nullptr; RuntimeProfile::Counter* _build_timer = nullptr; @@ -89,10 +89,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const override; - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override; + Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/empty_set_operator.cpp b/be/src/pipeline/exec/empty_set_operator.cpp index 9c44ce24fe..02dc802580 100644 --- a/be/src/pipeline/exec/empty_set_operator.cpp +++ b/be/src/pipeline/exec/empty_set_operator.cpp @@ -26,8 +26,8 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(EmptySetSourceOperator, SourceOperator) Status EmptySetSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - source_state = SourceState::FINISHED; + bool* eos) { + *eos = true; return Status::OK(); } diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h index cd456265fa..b65139fb98 100644 --- a/be/src/pipeline/exec/empty_set_operator.h +++ b/be/src/pipeline/exec/empty_set_operator.h @@ -58,8 +58,7 @@ public: const DescriptorTbl& descs) : OperatorX(pool, tnode, operator_id, descs) {} - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9f9a829da1..a43cc07b92 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -300,8 +300,7 @@ void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT static_cast(channel->close(state, Status::OK())); } -Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { +Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); @@ -340,7 +339,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block bool serialized = false; RETURN_IF_ERROR(local_state._serializer.next_serialized_block( block, block_holder->get_block(), local_state.channels.size(), &serialized, - source_state == SourceState::FINISHED)); + eos)); if (serialized) { auto cur_block = local_state._serializer.get_block()->to_block(); if (!cur_block.empty()) { @@ -357,8 +356,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_broadcast_block( - block_holder, source_state == SourceState::FINISHED); + status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -382,8 +380,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(local_state._serializer.serialize_block( block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_remote_block( - current_channel->ch_cur_pb_block(), source_state == SourceState::FINISHED); + auto status = + current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); } @@ -399,15 +397,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); } if (_part_type == TPartitionType::HASH_PARTITIONED) { - RETURN_IF_ERROR(channel_add_rows(state, local_state.channels, - local_state._partition_count, - (uint32_t*)local_state._partitioner->get_channel_ids(), - rows, block, source_state == SourceState::FINISHED)); + RETURN_IF_ERROR(channel_add_rows( + state, local_state.channels, local_state._partition_count, + (uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, eos)); } else { - RETURN_IF_ERROR(channel_add_rows(state, local_state.channel_shared_ptrs, - local_state._partition_count, - (uint32_t*)local_state._partitioner->get_channel_ids(), - rows, block, source_state == SourceState::FINISHED)); + RETURN_IF_ERROR(channel_add_rows( + state, local_state.channel_shared_ptrs, local_state._partition_count, + (uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, eos)); } } else { // Range partition @@ -416,7 +412,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } Status final_st = Status::OK(); - if (source_state == SourceState::FINISHED) { + if (eos) { local_state._serializer.reset_block(); for (int i = 0; i < local_state.channels.size(); ++i) { Status st = local_state.channels[i]->close(state, Status::OK()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 9c258c1527..43919236a8 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -194,8 +194,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest, int num_receivers = 1); diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 22f171269d..2c519319dd 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -150,7 +150,7 @@ Status ExchangeSourceOperatorX::open(RuntimeState* state) { } Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); if (_is_merging && !local_state.is_ready) { @@ -160,13 +160,12 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block local_state.is_ready = true; return Status::OK(); } - bool eos = false; - auto status = local_state.stream_recvr->get_next(block, &eos); + auto status = local_state.stream_recvr->get_next(block, eos); RETURN_IF_ERROR(doris::vectorized::VExprContext::filter_block(local_state.conjuncts(), block, block->columns())); // In vsortrunmerger, it will set eos=true, and block not empty // so that eos==true, could not make sure that block not have valid data - if (!eos || block->rows() > 0) { + if (!*eos || block->rows() > 0) { if (!_is_merging) { if (local_state.num_rows_skipped + block->rows() < _offset) { local_state.num_rows_skipped += block->rows(); @@ -180,7 +179,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block if (local_state.num_rows_returned() + block->rows() < _limit) { local_state.add_num_rows_returned(block->rows()); } else { - eos = true; + *eos = true; auto limit = _limit - local_state.num_rows_returned(); block->set_num_rows(limit); local_state.set_num_rows_returned(_limit); @@ -188,9 +187,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); } - if (eos) { - source_state = SourceState::FINISHED; - } return status; } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 1ee68b90af..1b106f2e25 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -80,8 +80,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; std::string debug_string(int indentation_level = 0) const override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 0c3689b15b..0a6ed9f35a 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -427,7 +427,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { } Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { + bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -469,7 +469,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } } - if (local_state._should_build_hash_table && source_state == SourceState::FINISHED) { + if (local_state._should_build_hash_table && eos) { DCHECK(!local_state._build_side_mutable_block.empty()); local_state._shared_state->build_block = std::make_shared( local_state._build_side_mutable_block.to_block()); @@ -560,7 +560,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } } - if (source_state == SourceState::FINISHED) { + if (eos) { local_state.init_short_circuit_for_probe(); // Since the comparison of null values is meaningless, null aware left anti/semi join should not output null // when the build side is not empty. diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 080097d8bc..5812c0861a 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -135,8 +135,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; bool should_dry_run(RuntimeState* state) override { return _is_broadcast_join && !state->get_sink_local_state() diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index a0c4df528a..e41c4e7144 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -232,13 +232,13 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode : std::vector {}) {} Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const { + bool* eos) const { auto& local_state = get_local_state(state); local_state.init_for_probe(state); SCOPED_TIMER(local_state._probe_timer); if (local_state._shared_state->short_circuit_for_probe) { // If we use a short-circuit strategy, should return empty block directly. - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } @@ -249,9 +249,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc // If we use a short-circuit strategy, should return block directly by add additional null data. auto block_rows = local_state._probe_block.rows(); if (local_state._probe_eos && block_rows == 0) { - if (local_state._probe_eos) { - source_state = SourceState::FINISHED; - } + *eos = local_state._probe_eos; return Status::OK(); } @@ -287,7 +285,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc /// No need to check the block size in `_filter_data_and_build_output` because here dose not /// increase the output rows count(just same as `_probe_block`'s rows count). - RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state, + RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos, &temp_block, false)); temp_block.clear(); local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); @@ -341,10 +339,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - bool eos = false; st = process_hashtable_ctx.process_data_in_hashtable( - arg, mutable_join_block, &temp_block, &eos, _is_mark_join); - source_state = eos ? SourceState::FINISHED : source_state; + arg, mutable_join_block, &temp_block, eos, _is_mark_join); } else { st = Status::InternalError("uninited hash table"); } @@ -355,7 +351,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc *local_state._shared_state->hash_table_variants, *local_state._process_hashtable_ctx_variants); } else { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } } else { @@ -365,8 +361,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return st; } - RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state, - &temp_block)); + RETURN_IF_ERROR( + local_state.filter_data_and_build_output(state, output_block, eos, &temp_block)); // Here make _join_block release the columns' ptr local_state._join_block.set_columns(local_state._join_block.clone_empty_columns()); mutable_join_block.clear(); @@ -417,7 +413,7 @@ std::vector HashJoinProbeLocalState::_convert_block_to_null(vectorized Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state, + bool* eos, vectorized::Block* temp_block, bool check_rows_count) { auto& p = _parent->cast(); @@ -436,7 +432,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false)); _reset_tuple_is_null_column(); - reached_limit(output_block, source_state); + reached_limit(output_block, eos); return Status::OK(); } @@ -468,10 +464,10 @@ Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block, } Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const { + bool eos) const { auto& local_state = get_local_state(state); local_state.prepare_for_next(); - local_state._probe_eos = source_state == SourceState::FINISHED; + local_state._probe_eos = eos; if (input_block->rows() > 0) { COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows()); int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size(); diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 43964442b0..af2a255d61 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -80,7 +80,7 @@ public: void add_tuple_is_null_column(vectorized::Block* block) override; void init_for_probe(RuntimeState* state); Status filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state, vectorized::Block* temp_block, + bool* eos, vectorized::Block* temp_block, bool check_rows_count = true); bool have_other_join_conjunct() const; @@ -154,10 +154,9 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override; + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const override; + bool* eos) const override; bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index fc90af3591..6d71c09cf1 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -55,11 +55,10 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) { return Status::OK(); } -Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { +Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state.sink(state, block, source_state)); + RETURN_IF_ERROR(local_state.sink(state, block, eos)); return Status::OK(); } diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h index 33018f69da..ebe64cf2ec 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.h +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -52,8 +52,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; private: friend class JdbcTableSinkLocalState; diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index eb2136d74b..e9dfc773eb 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -39,9 +39,7 @@ protected: template friend class StatefulOperatorX; JoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent), - _child_block(vectorized::Block::create_unique()), - _child_source_state(SourceState::DEPEND_ON_SOURCE) {} + : Base(state, parent), _child_block(vectorized::Block::create_unique()) {} ~JoinProbeLocalState() override = default; void _construct_mutable_join_block(); Status _build_output_block(vectorized::Block* origin_block, vectorized::Block* output_block, @@ -61,7 +59,7 @@ protected: RuntimeProfile::Counter* _build_output_block_timer = nullptr; std::unique_ptr _child_block = nullptr; - SourceState _child_source_state; + bool _child_eos = false; }; template diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index d3159d5b46..d9a29dfa0d 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -73,15 +73,14 @@ public: _sink(sink) {} ~MultiCastDataStreamSinkOperatorX() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { + if (in_block->rows() > 0 || eos) { COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - auto st = local_state._shared_state->multi_cast_data_streamer.push( - state, in_block, source_state == SourceState::FINISHED); + auto st = + local_state._shared_state->multi_cast_data_streamer.push(state, in_block, eos); // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished if (st.template is()) { return Status::OK(); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 2ba0c2bd0c..049905c5cc 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -152,18 +152,16 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState } Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state) { + vectorized::Block* block, bool* eos) { //auto& local_state = get_local_state(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - bool eos = false; vectorized::Block tmp_block; vectorized::Block* output_block = block; if (!local_state._output_expr_contexts.empty()) { output_block = &tmp_block; } - local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id, output_block, &eos); + local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id, output_block, eos); if (!local_state._conjuncts.empty()) { RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, @@ -176,9 +174,6 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, vectorized::materialize_block_inplace(*block); } COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); - if (eos) { - source_state = SourceState::FINISHED; - } return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 9f44790f4e..4deab7a5ac 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -162,8 +162,7 @@ public: return Status::OK(); } - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index a31c8cd491..fec2edc71b 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -87,7 +87,7 @@ Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { } Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* block, - SourceState source_state) { + bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); @@ -104,7 +104,7 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector } } - if (source_state == SourceState::FINISHED) { + if (eos) { COUNTER_UPDATE(local_state._build_rows_counter, local_state._build_rows); vectorized::RuntimeFilterBuild rf_ctx(&local_state); RETURN_IF_ERROR(rf_ctx(state)); diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 14694889e1..801d4ff88e 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -90,8 +90,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 04cdbded05..9080883586 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -479,7 +479,7 @@ bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) con } Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized::Block* block, - SourceState source_state) const { + bool eos) const { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); local_state._cur_probe_row_visited_flags.resize(block->rows()); @@ -487,7 +487,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized local_state._cur_probe_row_visited_flags.end(), 0); local_state._left_block_pos = 0; local_state._need_more_input_data = false; - local_state._shared_state->left_side_eos = source_state == SourceState::FINISHED; + local_state._shared_state->left_side_eos = eos; if (!_is_output_left_side_only) { auto func = [&](auto&& join_op_variants, auto set_build_side_flag, @@ -505,21 +505,18 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized } Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const { + bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_left_side_only) { RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); - source_state = - local_state._shared_state->left_side_eos ? SourceState::FINISHED : source_state; + *eos = local_state._shared_state->left_side_eos; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; } else { - source_state = ((_match_all_build || _is_right_semi_anti) - ? local_state._output_null_idx_build_side == - local_state._shared_state->build_blocks.size() && - local_state._matched_rows_done - : local_state._matched_rows_done) - ? SourceState::FINISHED - : source_state; + *eos = ((_match_all_build || _is_right_semi_anti) + ? local_state._output_null_idx_build_side == + local_state._shared_state->build_blocks.size() && + local_state._matched_rows_done + : local_state._matched_rows_done); { vectorized::Block tmp_block = local_state._join_block; @@ -538,7 +535,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block } local_state._join_block.clear_column_data(); - if (!(source_state == SourceState::FINISHED) and !local_state._need_more_input_data) { + if (!(*eos) and !local_state._need_more_input_data) { auto func = [&](auto&& join_op_variants, auto set_build_side_flag, auto set_probe_side_flag) { return local_state @@ -554,7 +551,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block } } - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index f1ba439bf1..770289f397 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -211,10 +211,9 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override; + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const override; + bool* eos) const override; const RowDescriptor& intermediate_row_desc() const override { return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; } diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index 6707ddd86e..c688660e26 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -91,12 +91,11 @@ public: RETURN_IF_ERROR(Base::open(state)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - return local_state.sink(state, in_block, source_state); + return local_state.sink(state, in_block, eos); } private: diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index f1b5d6e906..595009cfc9 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -93,12 +93,11 @@ public: return vectorized::VExpr::open(_output_vexpr_ctxs, state); } - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - return local_state.sink(state, in_block, source_state); + return local_state.sink(state, in_block, eos); } private: diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 03aa3d06f6..7c2bf51ad5 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -96,7 +96,7 @@ Status PartitionSortSinkOperatorX::open(RuntimeState* state) { } Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) { + bool eos) { auto& local_state = get_local_state(state); auto current_rows = input_block->rows(); SCOPED_TIMER(local_state.exec_time_counter()); @@ -122,15 +122,14 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._dependency->set_ready_to_read(); } } else { - RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, - source_state == SourceState::FINISHED)); + RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state, eos)); RETURN_IF_CANCELLED(state); input_block->clear_column_data(); } } } - if (source_state == SourceState::FINISHED) { + if (eos) { //seems could free for hashtable local_state._agg_arena_pool.reset(nullptr); local_state._partitioned_data.reset(nullptr); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 9dbb215b78..b39001d472 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -96,8 +96,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { return DataSinkOperatorX::required_data_distribution(); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index b69a5d1873..166cd84fc4 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -38,7 +38,7 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& } Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) { + bool* eos) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); @@ -74,10 +74,9 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: output_block->columns())); { std::lock_guard lock(local_state._shared_state->buffer_mutex); - if (local_state._shared_state->blocks_buffer.empty() && - local_state._sort_idx >= local_state._shared_state->partition_sorts.size()) { - source_state = SourceState::FINISHED; - } + + *eos = local_state._shared_state->blocks_buffer.empty() && + local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); } return Status::OK(); } diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 3fb544316a..9d810db203 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -73,8 +73,7 @@ public: const DescriptorTbl& descs) : OperatorX(pool, tnode, operator_id, descs) {} - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index d1f3dc7ccd..13f7a9e8d0 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -45,8 +45,6 @@ Status RepeatOperator::close(doris::RuntimeState* state) { RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), _child_block(vectorized::Block::create_unique()), - _child_source_state(SourceState::DEPEND_ON_SOURCE), - _child_eos(false), _repeat_id_idx(0) {} Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -179,10 +177,9 @@ Status RepeatLocalState::get_repeated_block(vectorized::Block* child_block, int return Status::OK(); } -Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const { +Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block, bool eos) const { auto& local_state = get_local_state(state); - local_state._child_eos = source_state == SourceState::FINISHED; + local_state._child_eos = eos; auto& _intermediate_block = local_state._intermediate_block; auto& _expr_ctxs = local_state._expr_ctxs; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); @@ -207,7 +204,7 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block } Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const { + bool* eos) const { auto& local_state = get_local_state(state); auto& _repeat_id_idx = local_state._repeat_id_idx; auto& _child_block = *local_state._child_block; @@ -235,10 +232,8 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - if (_child_eos && _child_block.rows() == 0) { - source_state = SourceState::FINISHED; - } - local_state.reached_limit(output_block, source_state); + *eos = _child_eos && _child_block.rows() == 0; + local_state.reached_limit(output_block, eos); COUNTER_SET(local_state._rows_returned_counter, local_state._num_rows_returned); return Status::OK(); } diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 768424722a..2822921029 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -64,8 +64,7 @@ private: template friend class StatefulOperatorX; std::unique_ptr _child_block; - SourceState _child_source_state; - bool _child_eos; + bool _child_eos = false; int _repeat_id_idx; std::unique_ptr _intermediate_block; vectorized::VExprContextSPtrs _expr_ctxs; @@ -82,10 +81,8 @@ public: Status open(RuntimeState* state) override; bool need_more_input_data(RuntimeState* state) const override; - Status pull(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const override; - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override; + Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override; + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; private: friend class RepeatLocalState; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 71a2d539c9..88646ba4db 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -262,12 +262,11 @@ void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelP static_cast(channel->close(state, Status::OK())); } -Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { +Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - return local_state.sink(state, in_block, source_state); + return local_state.sink(state, in_block, eos); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index e196401991..31b4b26206 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -100,8 +100,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; private: friend class ResultFileSinkLocalState; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 7eb7cd7ef8..11a208e139 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -126,8 +126,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { return vectorized::VExpr::open(_output_vexpr_ctxs, state); } -Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { +Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 15c9f2820b..d01be2272f 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -76,8 +76,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; private: friend class ResultSinkLocalState; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 119f5e42a6..08c58d4180 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1433,7 +1433,7 @@ Status ScanLocalState::close(RuntimeState* state) { template Status ScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); // in inverted index apply logic, in order to optimize query performance, @@ -1457,16 +1457,14 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: } if (local_state._eos) { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } - bool eos = false; - RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, &eos, 0)); + RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, block, eos, 0)); - local_state.reached_limit(block, source_state); - if (eos || source_state == SourceState::FINISHED) { - source_state = SourceState::FINISHED; + local_state.reached_limit(block, eos); + if (*eos) { // reach limit, stop the scanners. local_state._scanner_ctx->stop_scanners(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 101fd3047d..0b1b2f46da 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -373,11 +373,10 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); } Status open(RuntimeState* state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override { - return get_block(state, block, source_state); + bool* eos) override { + return get_block(state, block, eos); } [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index da16453c90..7b0b2f0ff5 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -209,8 +209,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { return Status::OK(); } -Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); @@ -244,7 +243,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, &schema_eos)); if (schema_eos) { - source_state = SourceState::FINISHED; + *eos = true; break; } @@ -267,9 +266,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl local_state._conjuncts, block, _dest_tuple_desc->slots().size())); src_block.clear(); } - } while (block->rows() == 0 && source_state != SourceState::FINISHED); + } while (block->rows() == 0 && !*eos); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index 2e00a2cbd4..bd336132ef 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -79,8 +79,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/select_operator.h b/be/src/pipeline/exec/select_operator.h index 218c8147d3..4fd929e323 100644 --- a/be/src/pipeline/exec/select_operator.h +++ b/be/src/pipeline/exec/select_operator.h @@ -59,13 +59,13 @@ public: const DescriptorTbl& descs) : StreamingOperatorX(pool, tnode, operator_id, descs) {} - Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { + Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) override { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index b746b5bf46..2d7bcd42c2 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -105,7 +105,7 @@ Status SetProbeSinkOperatorX::open(RuntimeState* state) { template Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { + bool eos) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); @@ -129,7 +129,7 @@ Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized *local_state._shared_state->hash_table_variants)); } - if (source_state == SourceState::FINISHED) { + if (eos) { _finalize_probe(local_state); } return Status::OK(); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index a7b06267b8..2e8500cd45 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -125,8 +125,7 @@ public: Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 639e333560..9080bb2450 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -52,7 +52,7 @@ template class SetSinkOperator; template Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { + bool eos) { constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); @@ -72,13 +72,12 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo } } - if (source_state == SourceState::FINISHED || - local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { + if (eos || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { build_block = local_state._mutable_block.to_block(); RETURN_IF_ERROR(_process_build_block(local_state, build_block, state)); local_state._mutable_block.clear(); - if (source_state == SourceState::FINISHED) { + if (eos) { if constexpr (is_intersect) { valid_element_in_hash_tbl = 0; } else { diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index a0885c1882..63b2b89380 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -115,8 +115,7 @@ public: Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index db681c7e66..954ca28dc9 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -92,7 +92,7 @@ Status SetSourceLocalState::open(RuntimeState* state) { template Status SetSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); @@ -101,8 +101,8 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, vectoriz [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - return _get_data_in_hashtable( - local_state, arg, block, state->batch_size(), source_state); + return _get_data_in_hashtable(local_state, arg, block, + state->batch_size(), eos); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -111,7 +111,7 @@ Status SetSourceOperatorX::get_block(RuntimeState* state, vectoriz RETURN_IF_ERROR(st); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } @@ -135,7 +135,7 @@ template template Status SetSourceOperatorX::_get_data_in_hashtable( SetSourceLocalState& local_state, HashTableContext& hash_table_ctx, - vectorized::Block* output_block, const int batch_size, SourceState& source_state) { + vectorized::Block* output_block, const int batch_size, bool* eos) { int left_col_len = local_state._left_table_data_types.size(); hash_table_ctx.init_iterator(); auto& iter = hash_table_ctx.iterator; @@ -155,9 +155,7 @@ Status SetSourceOperatorX::_get_data_in_hashtable( } } - if (iter == hash_table_ctx.hash_table->end()) { - source_state = SourceState::FINISHED; - } + *eos = iter == hash_table_ctx.hash_table->end(); if (!output_block->mem_reuse()) { for (int i = 0; i < left_col_len; ++i) { output_block->insert( diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index 90f09cbf97..d7026f015c 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -90,8 +90,7 @@ public: [[nodiscard]] bool is_source() const override { return true; } - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; private: friend class SetSourceLocalState; @@ -102,7 +101,7 @@ private: template Status _get_data_in_hashtable(SetSourceLocalState& local_state, HashTableContext& hash_table_ctx, vectorized::Block* output_block, - const int batch_size, SourceState& source_state); + const int batch_size, bool* eos); void _add_result_columns(SetSourceLocalState& local_state, vectorized::RowRefListWithFlags& value, int& block_size); diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 4df1549990..16d7e6fba0 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -143,8 +143,7 @@ Status SortSinkOperatorX::open(RuntimeState* state) { return _vsort_exec_exprs.open(state); } -Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { +Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -174,7 +173,7 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in } } - if (source_state == SourceState::FINISHED) { + if (eos) { RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 5502f436d3..c737e82e1e 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -83,8 +83,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { return _is_colocate diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 2b07cb17e0..af1dc66bc4 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -32,17 +32,12 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod const DescriptorTbl& descs) : OperatorX(pool, tnode, operator_id, descs) {} -Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - bool eos = false; RETURN_IF_ERROR_OR_CATCH_EXCEPTION( - local_state._shared_state->sorter->get_next(state, block, &eos)); - if (eos) { - source_state = SourceState::FINISHED; - } - local_state.reached_limit(block, source_state); + local_state._shared_state->sorter->get_next(state, block, eos)); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 303762b41c..39436ac26d 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -60,8 +60,7 @@ class SortSourceOperatorX final : public OperatorX { public: SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index f9ba600e3c..ab9a19ed6a 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -78,7 +78,6 @@ StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBas _agg_data(std::make_unique()), _agg_profile_arena(std::make_unique()), _child_block(vectorized::Block::create_unique()), - _child_source_state(SourceState::DEPEND_ON_SOURCE), _pre_aggregated_block(vectorized::Block::create_unique()) {} Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -760,7 +759,7 @@ Status StreamingAggLocalState::_create_agg_status(vectorized::AggregateDataPtr d Status StreamingAggLocalState::_get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& p = _parent->cast(); // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. bool mem_reuse = p._make_nullable_keys.empty() && block->mem_reuse(); @@ -838,10 +837,10 @@ Status StreamingAggLocalState::_get_with_serialized_key_result(RuntimeState* sta _aggregate_evaluators[i]->insert_result_info( mapped + p._offsets_of_aggregate_states[i], value_columns[i].get()); - source_state = SourceState::FINISHED; + *eos = true; } } else { - source_state = SourceState::FINISHED; + *eos = true; } } }, @@ -864,13 +863,13 @@ Status StreamingAggLocalState::_get_with_serialized_key_result(RuntimeState* sta } Status StreamingAggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { // 1. `child(0)->rows_returned() == 0` mean not data from child // in level two aggregation node should return NULL result // level one aggregation node set `eos = true` return directly SCOPED_TIMER(_serialize_result_timer); if (UNLIKELY(_input_num_rows == 0)) { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } block->clear(); @@ -903,13 +902,13 @@ Status StreamingAggLocalState::_serialize_without_key(RuntimeState* state, vecto } block->set_columns(std::move(value_columns)); - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } Status StreamingAggLocalState::_serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { SCOPED_TIMER(_serialize_result_timer); auto& p = _parent->cast(); int key_size = _probe_expr_ctxs.size(); @@ -972,10 +971,10 @@ Status StreamingAggLocalState::_serialize_with_serialized_key_result(RuntimeStat _values[num_rows] = agg_method.hash_table->template get_null_key_data< vectorized::AggregateDataPtr>(); ++num_rows; - source_state = SourceState::FINISHED; + *eos = true; } } else { - source_state = SourceState::FINISHED; + *eos = true; } } @@ -1026,8 +1025,7 @@ void StreamingAggLocalState::make_nullable_output_key(vectorized::Block* block) } Status StreamingAggLocalState::_get_without_key_result(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state) { + vectorized::Block* block, bool* eos) { DCHECK(_agg_data->without_key != nullptr); block->clear(); @@ -1074,7 +1072,7 @@ Status StreamingAggLocalState::_get_without_key_result(RuntimeState* state, } block->set_columns(std::move(columns)); - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } @@ -1264,26 +1262,24 @@ Status StreamingAggLocalState::close(RuntimeState* state) { return Base::close(state); } -Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const { +Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { auto& local_state = get_local_state(state); if (!local_state._pre_aggregated_block->empty()) { local_state._pre_aggregated_block->swap(*block); } else { - RETURN_IF_ERROR( - local_state._executor->get_result(&local_state, state, block, source_state)); + RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg RETURN_IF_ERROR( vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); } - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); return Status::OK(); } Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) const { + bool eos) const { auto& local_state = get_local_state(state); local_state._input_num_rows += in_block->rows(); if (in_block->rows() > 0) { @@ -1295,8 +1291,7 @@ Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_bl bool StreamingAggOperatorX::need_more_input_data(RuntimeState* state) const { auto& local_state = get_local_state(state); - return local_state._pre_aggregated_block->empty() && - local_state._child_source_state != SourceState::FINISHED; + return local_state._pre_aggregated_block->empty() && !local_state._child_eos; } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 9ee8d0244a..f1d95cd54e 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -63,14 +63,12 @@ private: Status _merge_with_serialized_key(vectorized::Block* block); void _update_memusage_with_serialized_key(); void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); - Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); - Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, bool* eos); + Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, bool* eos); Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + bool* eos); template Status _merge_with_serialized_key_helper(vectorized::Block* block); @@ -126,25 +124,24 @@ private: virtual Status execute(StreamingAggLocalState* local_state, vectorized::Block* block) = 0; virtual void update_memusage(StreamingAggLocalState* local_state) = 0; virtual Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, - vectorized::Block* block, SourceState& source_state) = 0; + vectorized::Block* block, bool* eos) = 0; virtual ~ExecutorBase() = default; }; template struct Executor final : public ExecutorBase { Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, - vectorized::Block* block, SourceState& source_state) override { + vectorized::Block* block, bool* eos) override { if constexpr (WithoutKey) { if constexpr (NeedFinalize) { - return local_state->_get_without_key_result(state, block, source_state); + return local_state->_get_without_key_result(state, block, eos); } else { - return local_state->_serialize_without_key(state, block, source_state); + return local_state->_serialize_without_key(state, block, eos); } } else { if constexpr (NeedFinalize) { - return local_state->_get_with_serialized_key_result(state, block, source_state); + return local_state->_get_with_serialized_key_result(state, block, eos); } else { - return local_state->_serialize_with_serialized_key_result(state, block, - source_state); + return local_state->_serialize_with_serialized_key_result(state, block, eos); } } } @@ -182,7 +179,7 @@ private: }; MemoryRecord _mem_usage_record; std::unique_ptr _child_block = nullptr; - SourceState _child_source_state; + bool _child_eos = false; std::unique_ptr _pre_aggregated_block = nullptr; std::vector _values; }; @@ -195,10 +192,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const override; - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override; + Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; bool need_more_input_data(RuntimeState* state) const override; private: diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 960fccff2b..bd4bc3f90d 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -144,8 +144,7 @@ bool TableFunctionLocalState::_is_inner_and_empty() { } Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, - vectorized::Block* output_block, - SourceState& source_state) { + vectorized::Block* output_block, bool* eos) { auto& p = _parent->cast(); vectorized::MutableBlock m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( output_block, p._output_slots); @@ -211,9 +210,7 @@ Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - if (_child_source_state == SourceState::FINISHED && _cur_child_offset == -1) { - source_state = SourceState::FINISHED; - } + *eos = _child_eos && _cur_child_offset == -1; return Status::OK(); } diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index c8f64a447a..3379a8f5b4 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -57,8 +57,7 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; void process_next_child_row(); - Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state); + Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos); private: friend class TableFunctionOperatorX; @@ -78,7 +77,7 @@ private: int64_t _cur_child_offset = 0; std::unique_ptr _child_block; int _current_row_insert_times = 0; - SourceState _child_source_state; + bool _child_eos = false; }; class TableFunctionOperatorX final : public StatefulOperatorX { @@ -92,16 +91,14 @@ public: bool need_more_input_data(RuntimeState* state) const override { auto& local_state = state->get_local_state(operator_id())->cast(); - return !local_state._child_block->rows() && - local_state._child_source_state != SourceState::FINISHED; + return !local_state._child_block->rows() && !local_state._child_eos; } DataDistribution required_data_distribution() const override { return {ExchangeType::PASSTHROUGH}; } - Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const override { + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override { auto& local_state = get_local_state(state); if (input_block->rows() == 0) { return Status::OK(); @@ -114,11 +111,10 @@ public: return Status::OK(); } - Status pull(RuntimeState* state, vectorized::Block* output_block, - SourceState& source_state) const override { + Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override { auto& local_state = get_local_state(state); - RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, source_state)); - local_state.reached_limit(output_block, source_state); + RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, eos)); + local_state.reached_limit(output_block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 1ce3ef5c21..ce1195f042 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -143,8 +143,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) { return Status::OK(); } -Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { +Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); @@ -167,7 +166,7 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block _cur_child_id, _get_first_materialized_child_idx(), children_count()); } - if (UNLIKELY(source_state == SourceState::FINISHED)) { + if (UNLIKELY(eos)) { //if _cur_child_id eos, need check to push block //Now here can't check _output_block rows, even it's row==0, also need push block //because maybe sink is eos and queue have none data, if not push block diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index fe1f26f45b..5fd670f0ef 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -109,8 +109,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 208e67895b..f2f4ca82e4 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -153,8 +153,7 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const { return fmt::to_string(debug_string_buffer); } -Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._need_read_for_const_expr) { @@ -174,19 +173,11 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b output_block->clear_column_data(_row_descriptor.num_materialized_slots()); local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx); } - local_state.reached_limit(block, source_state); + local_state.reached_limit(block, eos); //have executing const expr, queue have no data anymore, and child could be closed - if (_child_size == 0 && !local_state._need_read_for_const_expr) { - source_state = SourceState::FINISHED; - } else if (_has_data(state)) { - source_state = SourceState::MORE_DATA; - } else if (local_state._shared_state->data_queue.is_all_finish()) { - // Here, check the value of `_has_data(state)` again after `data_queue.is_all_finish()` is TRUE - // as there may be one or more blocks when `data_queue.is_all_finish()` is TRUE. - source_state = _has_data(state) ? SourceState::MORE_DATA : SourceState::FINISHED; - } else { - source_state = SourceState::DEPEND_ON_SOURCE; - } + *eos = (_child_size == 0 && !local_state._need_read_for_const_expr) || + (local_state._shared_state->data_queue.is_all_finish() && !_has_data(state)); + return Status::OK(); } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index b58d0e442d..69e81bcd4f 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -96,8 +96,7 @@ public: const DescriptorTbl& descs) : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {}; ~UnionSourceOperatorX() override = default; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } @@ -133,7 +132,7 @@ public: [[nodiscard]] int get_child_count() const { return _child_size; } private: - bool _has_data(RuntimeState* state) { + bool _has_data(RuntimeState* state) const { auto& local_state = state->get_local_state(operator_id())->cast(); if (_child_size == 0) { return local_state._need_read_for_const_expr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 7c8c18be61..0b5d7a2cc2 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -51,13 +51,13 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con } Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) { + bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, source_state, local_state)); + RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); - if (source_state == SourceState::FINISHED) { + if (eos) { local_state._shared_state->sub_running_sink_operators(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 2c7fe73c87..debfd113f3 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -140,8 +140,7 @@ public: return Status::OK(); } - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; private: friend class LocalExchangeSinkLocalState; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index b53fead5c5..568871835c 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -51,11 +51,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c } Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, source_state, local_state)); - local_state.reached_limit(block, source_state); + RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, eos, local_state)); + local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 1a9614de35..f0d7309c21 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -69,8 +69,7 @@ public: RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); } const RowDescriptor& row_desc() const override { return _child_x->row_desc(); } - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 1f680fdd1d..da395fefdd 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -22,8 +22,8 @@ namespace doris::pipeline { -Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, LocalExchangeSinkLocalState& local_state) { +Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { { SCOPED_TIMER(local_state._compute_hash_value_timer); RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, @@ -33,14 +33,13 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, SCOPED_TIMER(local_state._distribute_timer); RETURN_IF_ERROR(_split_rows(state, (const uint32_t*)local_state._partitioner->get_channel_ids(), - in_block, source_state, local_state)); + in_block, eos, local_state)); } return Status::OK(); } -Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, +Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { PartitionedBlock partitioned_block; std::unique_ptr mutable_block = nullptr; @@ -67,7 +66,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block get_data(block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; + *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); @@ -82,7 +81,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block } Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state, + vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { auto& data_queue = _data_queue; const auto rows = block->rows(); @@ -169,8 +168,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } -Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, +Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { @@ -185,8 +183,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo return Status::OK(); } -Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, +Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; if (_running_sink_operators == 0) { @@ -197,7 +194,7 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; + *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); @@ -210,8 +207,7 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b return Status::OK(); } -Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, +Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); @@ -221,11 +217,10 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block return Status::OK(); } -Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, +Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { if (local_state._channel_id != 0) { - source_state = SourceState::FINISHED; + *eos = true; return Status::OK(); } vectorized::Block next_block; @@ -234,7 +229,7 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo *block = std::move(next_block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; + *eos = true; } } else if (_data_queue[0].try_dequeue(next_block)) { *block = std::move(next_block); @@ -245,8 +240,7 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } -Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, +Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); @@ -258,8 +252,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block return Status::OK(); } -Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, +Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; if (_running_sink_operators == 0) { @@ -267,7 +260,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo *block = std::move(next_block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; + *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { *block = std::move(next_block); @@ -279,8 +272,7 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo } Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, - vectorized::Block* in_block, - SourceState source_state, + vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { @@ -296,7 +288,7 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, } Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state, + bool eos, LocalExchangeSinkLocalState& local_state) { std::vector channel_ids; const auto num_rows = block->rows(); @@ -312,12 +304,12 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz std::iota(channel_ids.begin() + i, channel_ids.end(), 0); } } - return _split_rows(state, channel_ids.data(), block, source_state, local_state); + return _split_rows(state, channel_ids.data(), block, eos, local_state); } Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state, + vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { auto& data_queue = _data_queue; const auto rows = block->rows(); @@ -354,20 +346,19 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, } Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, - LocalExchangeSinkLocalState& local_state) { + bool eos, LocalExchangeSinkLocalState& local_state) { if (_is_pass_through) { - return _passthrough_sink(state, in_block, source_state, local_state); + return _passthrough_sink(state, in_block, eos, local_state); } else { if (_total_block++ > _num_partitions) { _is_pass_through = true; } - return _shuffle_sink(state, in_block, source_state, local_state); + return _shuffle_sink(state, in_block, eos, local_state); } } Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, + bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; if (_running_sink_operators == 0) { @@ -378,7 +369,7 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; + *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index a27a592f8c..f2e3ae1a4c 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -39,10 +39,9 @@ public: _num_senders(running_sink_operators), _num_sources(num_sources) {} virtual ~Exchanger() = default; - virtual Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state, + virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) = 0; - virtual Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) = 0; virtual ExchangeType get_type() const = 0; @@ -88,10 +87,10 @@ public: _data_queue.resize(num_partitions); } ~ShuffleExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } @@ -103,7 +102,7 @@ protected: _data_queue.resize(num_partitions); } Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state, + vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); std::vector> _data_queue; @@ -129,10 +128,10 @@ public: _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } @@ -148,10 +147,10 @@ public: _data_queue.resize(num_partitions); } ~PassToOneExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } @@ -167,10 +166,10 @@ public: _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } @@ -187,20 +186,20 @@ public: : Exchanger(running_sink_operators, num_partitions) { _data_queue.resize(num_partitions); } - Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } private: - Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state, LocalExchangeSinkLocalState& local_state); - Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state); + Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state); Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state, + vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); std::vector> _data_queue; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 19cda4d51a..433e4b4865 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -197,26 +197,26 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori } Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto local_state = state->get_local_state(operator_id()); if (_output_row_descriptor) { local_state->clear_origin_block(); - auto status = get_block(state, &local_state->_origin_block, source_state); + auto status = get_block(state, &local_state->_origin_block, eos); if (UNLIKELY(!status.ok())) return status; return do_projections(state, &local_state->_origin_block, block); } local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption()); - return get_block(state, block, source_state); + return get_block(state, block, eos); } bool PipelineXLocalStateBase::reached_limit() const { return _parent->_limit != -1 && _num_rows_returned >= _parent->_limit; } -void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, SourceState& source_state) { +void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) { if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) { block->set_num_rows(_parent->_limit - _num_rows_returned); - source_state = SourceState::FINISHED; + *eos = true; } if (auto rows = block->rows()) { @@ -489,42 +489,38 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e template Status StreamingOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { - RETURN_IF_ERROR(OperatorX::_child_x->get_block_after_projects(state, block, - source_state)); - return pull(state, block, source_state); + bool* eos) { + RETURN_IF_ERROR( + OperatorX::_child_x->get_block_after_projects(state, block, eos)); + return pull(state, block, eos); } template Status StatefulOperatorX::get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { + bool* eos) { auto& local_state = get_local_state(state); if (need_more_input_data(state)) { local_state._child_block->clear_column_data(); RETURN_IF_ERROR(OperatorX::_child_x->get_block_after_projects( - state, local_state._child_block.get(), local_state._child_source_state)); - source_state = local_state._child_source_state; - if (local_state._child_block->rows() == 0 && - local_state._child_source_state != SourceState::FINISHED) { + state, local_state._child_block.get(), &local_state._child_eos)); + *eos = local_state._child_eos; + if (local_state._child_block->rows() == 0 && !local_state._child_eos) { return Status::OK(); } { SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR( - push(state, local_state._child_block.get(), local_state._child_source_state)); + RETURN_IF_ERROR(push(state, local_state._child_block.get(), local_state._child_eos)); } } if (!need_more_input_data(state)) { SCOPED_TIMER(local_state.exec_time_counter()); - SourceState new_state = SourceState::DEPEND_ON_SOURCE; - RETURN_IF_ERROR(pull(state, block, new_state)); - if (new_state == SourceState::FINISHED) { - source_state = SourceState::FINISHED; + bool new_eos = false; + RETURN_IF_ERROR(pull(state, block, &new_eos)); + if (new_eos) { + *eos = true; } else if (!need_more_input_data(state)) { - source_state = SourceState::MORE_DATA; - } else if (source_state == SourceState::MORE_DATA) { - source_state = local_state._child_source_state; + *eos = false; } } return Status::OK(); @@ -561,8 +557,8 @@ Status AsyncWriterSink::open(RuntimeState* state) { template requires(std::is_base_of_v) Status AsyncWriterSink::sink(RuntimeState* state, vectorized::Block* block, - SourceState source_state) { - return _writer->sink(block, source_state == SourceState::FINISHED); + bool eos) { + return _writer->sink(block, eos); } template diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 387fddef05..5d1268e50c 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -83,7 +83,7 @@ public: void clear_origin_block(); [[nodiscard]] bool reached_limit() const; - void reached_limit(vectorized::Block* block, SourceState& source_state); + void reached_limit(vectorized::Block* block, bool* eos); RuntimeProfile* profile() { return _runtime_profile.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } @@ -191,6 +191,9 @@ public: ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } + [[nodiscard]] virtual bool need_data_from_children(RuntimeState* state) const { + return is_source() ? true : _child_x == nullptr || _child_x->need_data_from_children(state); + } [[nodiscard]] virtual bool ignore_data_distribution() const { return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution; } @@ -203,6 +206,15 @@ public: Status open(RuntimeState* state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + LOG(FATAL) << "should not be called in pipelineX"; + return Status::OK(); + } + + [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, + bool* eos) = 0; + [[nodiscard]] bool can_terminate_early() override { return false; } [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } @@ -285,8 +297,7 @@ public: [[nodiscard]] bool is_source() const override { return false; } [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state); + vectorized::Block* block, bool* eos); /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(RuntimeState* state, vectorized::Block* origin_block, @@ -477,6 +488,13 @@ public: Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override { + LOG(FATAL) << "should not reach here!"; + return Status::OK(); + } + + [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0; + [[nodiscard]] virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; @@ -640,11 +658,9 @@ public: : OperatorX(pool, tnode, operator_id, descs) {} virtual ~StreamingOperatorX() = default; - Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; - virtual Status pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) = 0; + virtual Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; }; /** @@ -665,14 +681,22 @@ public: using OperatorX::get_local_state; - [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) final; + [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) final; [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) const = 0; + bool* eos) const = 0; [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block, - SourceState source_state) const = 0; + bool eos) const = 0; + [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const = 0; + + bool need_data_from_children(RuntimeState* state) const override { + if (need_more_input_data(state)) { + return OperatorX::_child_x->need_data_from_children(state); + } else { + return false; + } + } }; template @@ -691,7 +715,7 @@ public: Status open(RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state); + Status sink(RuntimeState* state, vectorized::Block* block, bool eos); Dependency* dependency() override { return _async_writer_dependency.get(); } Status close(RuntimeState* state, Status exec_status) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index d4117661a7..7c0e81c994 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -229,8 +229,8 @@ Status PipelineXTask::execute(bool* eos) { cpu_qs->add_cpu_nanos(delta_cpu_time); } }}; - // The status must be runnable *eos = false; + // The status must be runnable if (!_opened) { { SCOPED_RAW_TIMER(&time_spent); @@ -257,7 +257,7 @@ Status PipelineXTask::execute(bool* eos) { Status status = Status::OK(); set_begin_execute_time(); while (!_fragment_context->is_canceled()) { - if (_data_state != SourceState::MORE_DATA && !source_can_read()) { + if (_root->need_data_from_children(_state) && !source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } @@ -269,7 +269,6 @@ Status PipelineXTask::execute(bool* eos) { COUNTER_UPDATE(_yield_counts, 1); break; } - // TODO llj: Pipeline entity should_yield SCOPED_RAW_TIMER(&time_spent); _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); @@ -278,15 +277,14 @@ Status PipelineXTask::execute(bool* eos) { if (!_dry_run) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); - RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, _data_state)); + RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } else { - _data_state = SourceState::FINISHED; + *eos = true; } - *eos = _data_state == SourceState::FINISHED; if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - status = _sink->sink(_state, block, _data_state); + status = _sink->sink(_state, block, *eos); if (!status.is()) { RETURN_IF_ERROR(status); } @@ -350,9 +348,9 @@ std::string PipelineXTask::debug_string() { print_id(_state->fragment_instance_id())); fmt::format_to(debug_string_buffer, - "PipelineTask[this = {}, state = {}, data state = {}, dry run = {}, elapse time " + "PipelineTask[this = {}, state = {}, dry run = {}, elapse time " "= {}ns], block dependency = {}, is running = {}\noperators: ", - (void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run, + (void*)this, get_state_name(_cur_state), _dry_run, MonotonicNanos() - _fragment_context->create_time(), _blocked_dep ? _blocked_dep->debug_string() : "NULL", is_running()); for (size_t i = 0; i < _operators.size(); i++) {