diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 06211faf52..924db8a729 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -27,14 +27,16 @@ namespace doris::pipeline { MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder( - int32_t id, const int consumer_id, std::shared_ptr& data_streamer) + int32_t id, const int consumer_id, std::shared_ptr& data_streamer, + const TDataStreamSink& sink) : OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"), _consumer_id(consumer_id), - _multi_cast_data_streamer(data_streamer) {}; + _multi_cast_data_streamer(data_streamer), + _t_data_stream_sink(sink) {} OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() { - return std::make_shared(this, _consumer_id, - _multi_cast_data_streamer); + return std::make_shared( + this, _consumer_id, _multi_cast_data_streamer, _t_data_stream_sink); } const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() { @@ -43,10 +45,38 @@ const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() { MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator( OperatorBuilderBase* operator_builder, const int consumer_id, - std::shared_ptr& data_streamer) + std::shared_ptr& data_streamer, const TDataStreamSink& sink) : OperatorBase(operator_builder), + vectorized::RuntimeFilterConsumer(sink.dest_node_id, sink.runtime_filters, + data_streamer->row_desc(), _conjuncts), _consumer_id(consumer_id), - _multi_cast_data_streamer(data_streamer) {}; + _multi_cast_data_streamer(data_streamer), + _t_data_stream_sink(sink) {} + +Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) { + RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + _register_runtime_filter(); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, row_desc())); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, _conjuncts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, row_desc())); + } + return Status::OK(); +} + +Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) { + return _acquire_runtime_filter(); +} + +bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() { + return vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout(); +} bool MultiCastDataStreamerSourceOperator::can_read() { return _multi_cast_data_streamer->can_read(_consumer_id); @@ -55,7 +85,23 @@ bool MultiCastDataStreamerSourceOperator::can_read() { Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { bool eos = false; - _multi_cast_data_streamer->pull(_consumer_id, block, &eos); + vectorized::Block tmp_block; + vectorized::Block* output_block = block; + if (!_output_expr_contexts.empty()) { + output_block = &tmp_block; + } + _multi_cast_data_streamer->pull(_consumer_id, output_block, &eos); + + if (!_conjuncts.empty()) { + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + output_block->columns())); + } + + if (!_output_expr_contexts.empty() && output_block->rows() > 0) { + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _output_expr_contexts, *output_block, block)); + materialize_block_inplace(*block); + } if (eos) { source_state = SourceState::FINISHED; } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 15bd320b89..8c9cb99f82 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "operator.h" +#include "vec/exec/runtime_filter_consumer.h" namespace doris { class ExecNode; @@ -37,7 +38,8 @@ class MultiCastDataStreamer; class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase { public: MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id, - std::shared_ptr&); + std::shared_ptr&, + const TDataStreamSink&); bool is_source() const override { return true; } @@ -48,20 +50,25 @@ public: private: const int _consumer_id; std::shared_ptr _multi_cast_data_streamer; + TDataStreamSink _t_data_stream_sink; }; -class MultiCastDataStreamerSourceOperator final : public OperatorBase { +class MultiCastDataStreamerSourceOperator final : public OperatorBase, + public vectorized::RuntimeFilterConsumer { public: MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder, const int consumer_id, - std::shared_ptr& data_streamer); + std::shared_ptr& data_streamer, + const TDataStreamSink& sink); Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; - Status prepare(RuntimeState* state) override { return Status::OK(); }; + Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override { return Status::OK(); }; + Status open(RuntimeState* state) override; + + bool runtime_filters_are_ready_or_timeout() override; Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override { return Status::OK(); @@ -76,6 +83,10 @@ public: private: const int _consumer_id; std::shared_ptr _multi_cast_data_streamer; + TDataStreamSink _t_data_stream_sink; + + vectorized::VExprContextSPtrs _output_expr_contexts; + vectorized::VExprContextSPtrs _conjuncts; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4aaaf7a7ec..0d9745255f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -768,16 +768,25 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr _multi_cast_stream_sink_senders.resize(sender_size); for (int i = 0; i < sender_size; ++i) { auto new_pipeline = add_pipeline(); + + auto row_desc = + !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() + ? RowDescriptor( + _runtime_state->desc_tbl(), + {thrift_sink.multi_cast_stream_sink.sinks[i].output_tuple_id}, + {false}) + : sink_->row_desc(); // 1. create the data stream sender sink _multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender( - _runtime_state.get(), _runtime_state->obj_pool(), sender_id, sink_->row_desc(), + _runtime_state.get(), _runtime_state->obj_pool(), sender_id, row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 1024, false)); // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline OperatorBuilderPtr source_op = std::make_shared( - next_operator_builder_id(), i, multi_cast_data_streamer); + next_operator_builder_id(), i, multi_cast_data_streamer, + thrift_sink.multi_cast_stream_sink.sinks[i]); new_pipeline->add_operator(source_op); // 3. create and set sink operator of data stream sender for new pipeline diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index e2b4c525ac..14ee165684 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -132,7 +132,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } IRuntimeFilter* filter; RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, - &options, RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp similarity index 76% rename from be/src/vec/exec/runtime_filter_consumer_node.cpp rename to be/src/vec/exec/runtime_filter_consumer.cpp index dd631ce66e..4f51d99ccc 100644 --- a/be/src/vec/exec/runtime_filter_consumer_node.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -15,22 +15,26 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/runtime_filter_consumer_node.h" +#include "vec/exec/runtime_filter_consumer.h" namespace doris::vectorized { -RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} +RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, + const std::vector& runtime_filters, + const RowDescriptor& row_descriptor, + VExprContextSPtrs& conjuncts) + : _filter_id(filter_id), + _runtime_filter_descs(runtime_filters), + _row_descriptor_ref(row_descriptor), + _conjuncts_ref(conjuncts) {} -Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); +Status RuntimeFilterConsumer::init(RuntimeState* state) { _state = state; RETURN_IF_ERROR(_register_runtime_filter()); return Status::OK(); } -Status RuntimeFilterConsumerNode::_register_runtime_filter() { +Status RuntimeFilterConsumer::_register_runtime_filter() { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); @@ -43,14 +47,14 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() { // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) // 2. This filter is bloom filter (only bloom filter should be used for merging) RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), id(), false)); + filter_desc, _state->query_options(), _filter_id, false)); RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, id(), &runtime_filter)); + filter_desc.filter_id, _filter_id, &runtime_filter)); } else { RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), id(), false)); + filter_desc, _state->query_options(), _filter_id, false)); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, id(), &runtime_filter)); + filter_desc.filter_id, _filter_id, &runtime_filter)); } _runtime_filter_ctxs.emplace_back(runtime_filter); _runtime_filter_ready_flag.emplace_back(false); @@ -58,7 +62,7 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() { return Status::OK(); } -bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() { +bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { if (!_blocked_by_rf) { return true; } @@ -72,20 +76,19 @@ bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() { return true; } -Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) { +Status RuntimeFilterConsumer::_acquire_runtime_filter() { SCOPED_TIMER(_acquire_runtime_filter_timer); VExprSPtrs vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; bool ready = runtime_filter->is_ready(); - if (!ready && wait) { + if (!ready) { ready = runtime_filter->await(); } if (ready && !_runtime_filter_ctxs[i].apply_mark) { RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); _runtime_filter_ctxs[i].apply_mark = true; - } else if ((wait || !runtime_filter->is_ready_or_timeout()) && - runtime_filter->current_state() == RuntimeFilterState::NOT_READY && + } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && !_runtime_filter_ctxs[i].apply_mark) { _blocked_by_rf = true; } else if (!_runtime_filter_ctxs[i].apply_mark) { @@ -101,23 +104,23 @@ Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) { return Status::OK(); } -Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { +Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { if (vexprs.empty()) { return Status::OK(); } for (auto& expr : vexprs) { VExprContextSPtr conjunct = VExprContext::create_shared(expr); - RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor)); + RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref)); RETURN_IF_ERROR(conjunct->open(_state)); _rf_vexpr_set.insert(expr); - _conjuncts.emplace_back(conjunct); + _conjuncts_ref.emplace_back(conjunct); } return Status::OK(); } -Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { +Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { if (_is_all_rf_applied) { *arrived_rf_num = _runtime_filter_descs.size(); return Status::OK(); @@ -140,12 +143,12 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs( - &exprs, _row_descriptor, _state)); + &exprs, _row_descriptor_ref, _state)); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } } - // 2. Append unapplied runtime filters to vconjunct_ctx_ptr + // 2. Append unapplied runtime filters to _conjuncts if (!exprs.empty()) { RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs)); } @@ -157,7 +160,7 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar return Status::OK(); } -void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) { +void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) { _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime"); } diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer.h similarity index 81% rename from be/src/vec/exec/runtime_filter_consumer_node.h rename to be/src/vec/exec/runtime_filter_consumer.h index 518e0e865c..c938e8510b 100644 --- a/be/src/vec/exec/runtime_filter_consumer_node.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -22,14 +22,16 @@ namespace doris::vectorized { -class RuntimeFilterConsumerNode : public ExecNode { +class RuntimeFilterConsumer { public: - RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~RuntimeFilterConsumerNode() override = default; + RuntimeFilterConsumer(const int32_t filter_id, + const std::vector& runtime_filters, + const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); + ~RuntimeFilterConsumer() = default; - Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status init(RuntimeState* state); - // Try append late arrived runtime filters. + // Try to append late arrived runtime filters. // Return num of filters which are applied already. Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); @@ -39,7 +41,7 @@ protected: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); // Get all arrived runtime filters at Open phase. - Status _acquire_runtime_filter(bool wait = true); + Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs); @@ -54,15 +56,23 @@ protected: IRuntimeFilter* runtime_filter; }; - RuntimeState* _state; - std::vector _runtime_filter_ctxs; - - std::vector _runtime_filter_descs; // Set to true if the runtime filter is ready. std::vector _runtime_filter_ready_flag; doris::Mutex _rf_locks; phmap::flat_hash_set _rf_vexpr_set; + +private: + RuntimeState* _state; + + int32_t _filter_id; + + std::vector _runtime_filter_descs; + + const RowDescriptor& _row_descriptor_ref; + + VExprContextSPtrs& _conjuncts_ref; + // True means all runtime filters are applied to scanners bool _is_all_rf_applied = true; bool _blocked_by_rf = false; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index aaefef32ee..e9a14aac71 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { } Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state)); + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); + _state = state; _is_pipeline_scan = state->enable_pipeline_exec(); const TQueryOptions& query_options = state->query_options(); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index ee0dadefdc..481e3480dc 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -43,7 +43,7 @@ #include "runtime/runtime_state.h" #include "util/lock.h" #include "util/runtime_profile.h" -#include "vec/exec/runtime_filter_consumer_node.h" +#include "vec/exec/runtime_filter_consumer.h" #include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscanner.h" #include "vec/runtime/shared_scanner_controller.h" @@ -88,10 +88,12 @@ struct FilterPredicates { std::vector>> in_filters; }; -class VScanNode : public RuntimeFilterConsumerNode { +class VScanNode : public ExecNode, public RuntimeFilterConsumer { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : RuntimeFilterConsumerNode(pool, tnode, descs) { + : ExecNode(pool, tnode, descs), + RuntimeFilterConsumer(id(), tnode.runtime_filters, ExecNode::_row_descriptor, + ExecNode::_conjuncts) { if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { // Which means the request could be fullfilled in a single segment iterator request. if (tnode.limit > 0 && tnode.limit < 1024) { @@ -304,6 +306,8 @@ protected: VExprContextSPtrs _stale_expr_ctxs; VExprContextSPtrs _common_expr_ctxs_push_down; + RuntimeState* _state; + // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index 626fd5ce96..ee1628cd19 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -37,34 +37,22 @@ class TPlanNode; namespace vectorized { VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {} + : ExecNode(pool, tnode, descs), _child_eos(false) {} Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) { - return RuntimeFilterConsumerNode::init(tnode, state); + return ExecNode::init(tnode, state); } Status VSelectNode::prepare(RuntimeState* state) { - return RuntimeFilterConsumerNode::prepare(state); + return ExecNode::prepare(state); } Status VSelectNode::open(RuntimeState* state) { - RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state)); + RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); } -Status VSelectNode::alloc_resource(RuntimeState* state) { - if (_opened) { - return Status::OK(); - } - - RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state)); - RETURN_IF_ERROR(_acquire_runtime_filter()); - RETURN_IF_CANCELLED(state); - _opened = true; - return Status::OK(); -} - Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h index 140009e4b3..d6783d7237 100644 --- a/be/src/vec/exec/vselect_node.h +++ b/be/src/vec/exec/vselect_node.h @@ -17,7 +17,7 @@ #pragma once #include "common/status.h" -#include "vec/exec/runtime_filter_consumer_node.h" +#include "exec/exec_node.h" namespace doris { class DescriptorTbl; @@ -28,7 +28,7 @@ class TPlanNode; namespace vectorized { class Block; -class VSelectNode final : public RuntimeFilterConsumerNode { +class VSelectNode final : public ExecNode { public: VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; @@ -38,12 +38,9 @@ public: Status close(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status alloc_resource(RuntimeState* state) override; - private: // true if last get_next() call on child signalled eos bool _child_eos; - bool _opened = false; }; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6aff68e6b4..650bcc43a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -247,27 +247,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor distribute, PlanTranslatorContext context) { - PlanFragment childFragment = distribute.child().accept(this, context); + PlanFragment inputFragment = distribute.child().accept(this, context); // TODO: why need set streaming here? should remove this. - if (childFragment.getPlanRoot() instanceof AggregationNode + if (inputFragment.getPlanRoot() instanceof AggregationNode && distribute.child() instanceof PhysicalHashAggregate - && context.getFirstAggregateInFragment(childFragment) == distribute.child()) { + && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) { PhysicalHashAggregate hashAggregate = (PhysicalHashAggregate) distribute.child(); if (hashAggregate.getAggPhase() == AggPhase.LOCAL && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) { - AggregationNode aggregationNode = (AggregationNode) childFragment.getPlanRoot(); + AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot(); aggregationNode.setUseStreamingPreagg(hashAggregate.isMaybeUsingStream()); } } - ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), childFragment.getPlanRoot()); + ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot()); updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute); - exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances()); - if (distribute.getDistributionSpec() instanceof DistributionSpecGather) { - // gather to one instance - exchangeNode.setNumInstances(1); - } - List validOutputIds = distribute.getOutputExprIds(); if (distribute.child() instanceof PhysicalHashAggregate) { // we must add group by keys to output list, @@ -282,8 +276,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor cteProducer = context.getCteProduceMap().get(cteId); Preconditions.checkState(cteProducer != null, "invalid cteProducer"); - ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), multiCastFragment.getPlanRoot()); - - DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId()); - streamSink.setPartition(DataPartition.RANDOM); + // set datasink to multicast data sink but do not set target now + // target will be set when translate distribute + DataStreamSink streamSink = new DataStreamSink(); streamSink.setFragment(multiCastFragment); - multiCastDataSink.getDataStreamSinks().add(streamSink); multiCastDataSink.getDestinations().add(Lists.newArrayList()); - exchangeNode.setNumInstances(multiCastFragment.getPlanRoot().getNumInstances()); - - PlanFragment consumeFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, - multiCastFragment.getDataPartition()); - - Map projectMap = Maps.newHashMap(); - projectMap.putAll(cteConsumer.getProducerToConsumerSlotMap()); - - List execList = new ArrayList<>(); - PlanNode inputPlanNode = consumeFragment.getPlanRoot(); - List cteProjects = cteProducer.getProjects(); - for (Slot slot : cteProjects) { - if (projectMap.containsKey(slot)) { - execList.add(projectMap.get(slot)); - } else { - throw new RuntimeException("could not find slot in cte producer consumer projectMap"); - } + // update expr to slot mapping + for (Slot producerSlot : cteProducer.getProjects()) { + Slot consumerSlot = cteConsumer.getProducerToConsumerSlotMap().get(producerSlot); + SlotRef slotRef = context.findSlotRef(producerSlot.getExprId()); + context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef); } - - List slotList = execList - .stream() - .map(NamedExpression::toSlot) - .collect(Collectors.toList()); - - TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); - - // update tuple list and tblTupleList - inputPlanNode.getTupleIds().clear(); - inputPlanNode.getTupleIds().add(tupleDescriptor.getId()); - inputPlanNode.getTblRefIds().clear(); - inputPlanNode.getTblRefIds().add(tupleDescriptor.getId()); - inputPlanNode.getNullableTupleIds().clear(); - inputPlanNode.getNullableTupleIds().add(tupleDescriptor.getId()); - - List execExprList = execList - .stream() - .map(e -> ExpressionTranslator.translate(e, context)) - .collect(Collectors.toList()); - - inputPlanNode.setProjectList(execExprList); - inputPlanNode.setOutputTupleDesc(tupleDescriptor); - - // update data partition - consumeFragment.setDataPartition(DataPartition.RANDOM); - - SelectNode projectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode); - consumeFragment.setPlanRoot(projectNode); - - multiCastFragment.getDestNodeList().add(exchangeNode); - consumeFragment.addChild(multiCastFragment); - context.getPlanFragments().add(consumeFragment); - - return consumeFragment; + return multiCastFragment; } @Override @@ -859,6 +825,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor ExpressionTranslator.translate(e, context)) + .forEach(dataStreamSink::addConjunct); + return inputFragment; + } + PlanNode planNode = inputFragment.getPlanRoot(); if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode) { // the three nodes don't support conjuncts, need create a SelectNode to filter data @@ -1397,19 +1374,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor) project.child(0).child(0)).setShouldTranslateOutput(false); } } + PlanFragment inputFragment = project.child(0).accept(this, context); + List execExprList = project.getProjects() .stream() .map(e -> ExpressionTranslator.translate(e, context)) .collect(Collectors.toList()); // TODO: fix the project alias of an aliased relation. - - PlanNode inputPlanNode = inputFragment.getPlanRoot(); List slotList = project.getProjects() .stream() .map(NamedExpression::toSlot) .collect(Collectors.toList()); + // process multicast sink + if (inputFragment instanceof MultiCastPlanFragment) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); + DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( + multiCastDataSink.getDataStreamSinks().size() - 1); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); + dataStreamSink.setProjections(execExprList); + dataStreamSink.setOutputTupleDesc(tupleDescriptor); + return inputFragment; + } + + PlanNode inputPlanNode = inputFragment.getPlanRoot(); List predicateList = inputPlanNode.getConjuncts(); Set requiredSlotIdSet = Sets.newHashSet(); for (Expr expr : execExprList) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 2695b67a93..158eaa67f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -114,7 +114,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor { @Override public Boolean visit(Plan plan, Void context) { + // process must shuffle + for (int i = 0; i < children.size(); i++) { + DistributionSpec distributionSpec = childrenProperties.get(i).getDistributionSpec(); + if (distributionSpec instanceof DistributionSpecMustShuffle) { + updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY); + } + } return true; } @Override public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate agg, Void context) { + // forbid one phase agg on distribute if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) { // this means one stage gather agg, usually bad pattern return false; } + // process must shuffle + visit(agg, context); + // process agg + return true; + } + + @Override + public Boolean visitPhysicalFilter(PhysicalFilter filter, Void context) { + // do not process must shuffle return true; } @@ -93,6 +112,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { Preconditions.checkArgument(children.size() == 2, "children.size() != 2"); Preconditions.checkArgument(childrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); + // process must shuffle + visit(hashJoin, context); + // process hash join DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec(); DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); @@ -229,6 +251,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size())); Preconditions.checkArgument(childrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); + // process must shuffle + visit(nestedLoopJoin, context); + // process nlj DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); if (rightDistributionSpec instanceof DistributionSpecStorageGather) { updateChildEnforceAndCost(1, PhysicalProperties.GATHER); @@ -236,8 +261,17 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { return true; } + @Override + public Boolean visitPhysicalProject(PhysicalProject project, Void context) { + // do not process must shuffle + return true; + } + @Override public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) { + // process must shuffle + visit(setOperation, context); + // process set operation if (children.isEmpty()) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java new file mode 100644 index 0000000000..8f718fbb9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecMustShuffle.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +/** + * present data must use after shuffle + */ +public class DistributionSpecMustShuffle extends DistributionSpec { + + public static final DistributionSpecMustShuffle INSTANCE = new DistributionSpecMustShuffle(); + + public DistributionSpecMustShuffle() { + super(); + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecAny; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index 28bf347977..e3b5151af7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -44,6 +44,8 @@ public class PhysicalProperties { public static PhysicalProperties STORAGE_GATHER = new PhysicalProperties(DistributionSpecStorageGather.INSTANCE); + public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE); + private final OrderSpec orderSpec; private final DistributionSpec distributionSpec; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 0f903d69d1..6a43694527 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -20,19 +20,38 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.BitmapFilterPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDataStreamSink; import org.apache.doris.thrift.TExplainLevel; +import com.google.common.collect.Lists; +import org.springframework.util.CollectionUtils; + +import java.util.List; + /** * Data sink that forwards data to an exchange node. */ public class DataStreamSink extends DataSink { - private final PlanNodeId exchNodeId; + + private PlanNodeId exchNodeId; private DataPartition outputPartition; + protected TupleDescriptor outputTupleDesc; + + protected List projections; + + protected List conjuncts = Lists.newArrayList(); + + public DataStreamSink() { + + } + public DataStreamSink(PlanNodeId exchNodeId) { this.exchNodeId = exchNodeId; } @@ -42,23 +61,66 @@ public class DataStreamSink extends DataSink { return exchNodeId; } + public void setExchNodeId(PlanNodeId exchNodeId) { + this.exchNodeId = exchNodeId; + } + @Override public DataPartition getOutputPartition() { return outputPartition; } - public void setPartition(DataPartition partition) { - outputPartition = partition; + public void setOutputPartition(DataPartition outputPartition) { + this.outputPartition = outputPartition; + } + + public TupleDescriptor getOutputTupleDesc() { + return outputTupleDesc; + } + + public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) { + this.outputTupleDesc = outputTupleDesc; + } + + public List getProjections() { + return projections; + } + + public void setProjections(List projections) { + this.projections = projections; + } + + public List getConjuncts() { + return conjuncts; + } + + public void setConjuncts(List conjuncts) { + this.conjuncts = conjuncts; + } + + public void addConjunct(Expr conjunct) { + this.conjuncts.add(conjunct); } @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); - strBuilder.append(prefix + "STREAM DATA SINK\n"); - strBuilder.append(prefix + " EXCHANGE ID: " + exchNodeId + "\n"); + strBuilder.append(prefix).append("STREAM DATA SINK\n"); + strBuilder.append(prefix).append(" EXCHANGE ID: ").append(exchNodeId); if (outputPartition != null) { - strBuilder.append(prefix + " " + outputPartition.getExplainString(explainLevel)); + strBuilder.append("\n").append(prefix).append(" ").append(outputPartition.getExplainString(explainLevel)); } + if (!conjuncts.isEmpty()) { + Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts); + strBuilder.append(prefix).append(" CONJUNCTS: ").append(expr.toSql()).append("\n"); + } + if (!CollectionUtils.isEmpty(projections)) { + strBuilder.append(prefix).append(" PROJECTIONS: ") + .append(PlanNode.getExplainString(projections)).append("\n"); + strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId()); + strBuilder.append("\n"); + } + return strBuilder.toString(); } @@ -67,6 +129,19 @@ public class DataStreamSink extends DataSink { TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK); TDataStreamSink tStreamSink = new TDataStreamSink(exchNodeId.asInt(), outputPartition.toThrift()); + for (Expr e : conjuncts) { + if (!(e instanceof BitmapFilterPredicate)) { + tStreamSink.addToConjuncts(e.treeToThrift()); + } + } + if (projections != null) { + for (Expr expr : projections) { + tStreamSink.addToOutputExprs(expr.treeToThrift()); + } + } + if (outputTupleDesc != null) { + tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt()); + } result.setStreamSink(tStreamSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 30bf9eb45d..6694a05219 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -108,15 +108,21 @@ public class ExchangeNode extends PlanNode { public final void computeTupleIds() { PlanNode inputNode = getChild(0); TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc(); + updateTupleIds(outputTupleDesc); + } + + public void updateTupleIds(TupleDescriptor outputTupleDesc) { if (outputTupleDesc != null) { tupleIds.clear(); tupleIds.add(outputTupleDesc.getId()); + tblRefIds.add(outputTupleDesc.getId()); + nullableTupleIds.add(outputTupleDesc.getId()); } else { clearTupleIds(); tupleIds.addAll(getChild(0).getTupleIds()); + tblRefIds.addAll(getChild(0).getTblRefIds()); + nullableTupleIds.addAll(getChild(0).getNullableTupleIds()); } - tblRefIds.addAll(getChild(0).getTblRefIds()); - nullableTupleIds.addAll(getChild(0).getNullableTupleIds()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java index 0d5b54b269..e52bbb0557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MultiCastPlanFragment.java @@ -35,10 +35,11 @@ public class MultiCastPlanFragment extends PlanFragment { this.children.addAll(planFragment.getChildren()); } - public List getDestNodeList() { - return destNodeList; + public void addToDest(ExchangeNode exchangeNode) { + destNodeList.add(exchangeNode); } + public List getDestFragmentList() { return destNodeList.stream().map(PlanNode::getFragment).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 54903beae5..64ac4c3051 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -256,7 +256,7 @@ public class PlanFragment extends TreeNode { Preconditions.checkState(sink == null); // we're streaming to an exchange node DataStreamSink streamSink = new DataStreamSink(destNode.getId()); - streamSink.setPartition(outputPartition); + streamSink.setOutputPartition(outputPartition); streamSink.setFragment(this); sink = streamSink; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index ab17c0acec..cf5d14cb5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -415,7 +415,7 @@ public abstract class PlanNode extends TreeNode implements PlanStats { } } - protected Expr convertConjunctsToAndCompoundPredicate(List conjuncts) { + public static Expr convertConjunctsToAndCompoundPredicate(List conjuncts) { List targetConjuncts = Lists.newArrayList(conjuncts); while (targetConjuncts.size() > 1) { List newTargetConjuncts = Lists.newArrayList(); @@ -824,7 +824,7 @@ public abstract class PlanNode extends TreeNode implements PlanStats { return output.toString(); } - protected String getExplainString(List exprs) { + public static String getExplainString(List exprs) { if (exprs == null) { return ""; } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index af5d4d26a3..c78a7900a9 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -153,6 +153,18 @@ struct TDataStreamSink { 2: required Partitions.TDataPartition output_partition 3: optional bool ignore_not_found + + // per-destination projections + 4: optional list output_exprs + + // project output tuple id + 5: optional Types.TTupleId output_tuple_id + + // per-destination filters + 6: optional list conjuncts + + // per-destination runtime filters + 7: optional list runtime_filters } struct TMultiCastDataStreamSink { diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out index 16ab4a00e2..838ba573be 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query1.out @@ -25,16 +25,17 @@ CteAnchor[cteId= ( CTEId#2=] ) --------------PhysicalProject ----------------hashJoin[LEFT_SEMI_JOIN](ctr1.ctr_store_sk = ctr2.ctr_store_sk)(cast(ctr_total_return as DOUBLE) > cast((avg(ctr_total_return) * 1.2) as DOUBLE)) ------------------hashJoin[INNER_JOIN](store.s_store_sk = ctr1.ctr_store_sk) ---------------------CteConsumer[cteId= ( CTEId#2=] ) +--------------------PhysicalDistribute +----------------------CteConsumer[cteId= ( CTEId#2=] ) --------------------PhysicalDistribute ----------------------PhysicalProject ------------------------filter((cast(s_state as VARCHAR(*)) = 'SD')) --------------------------PhysicalOlapScan[store] -------------------PhysicalDistribute ---------------------PhysicalProject -----------------------hashAgg[GLOBAL] -------------------------PhysicalDistribute ---------------------------hashAgg[LOCAL] +------------------PhysicalProject +--------------------hashAgg[GLOBAL] +----------------------PhysicalDistribute +------------------------hashAgg[LOCAL] +--------------------------PhysicalDistribute ----------------------------PhysicalProject ------------------------------CteConsumer[cteId= ( CTEId#2=] ) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out index e970e12f97..13e0f2f0e9 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query2.out @@ -24,20 +24,22 @@ CteAnchor[cteId= ( CTEId#4=] ) ----------hashJoin[INNER_JOIN](expr_cast(d_week_seq1 as BIGINT) = expr_(d_week_seq2 - 53)) ------------PhysicalDistribute --------------PhysicalProject -----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq1) -------------------PhysicalProject ---------------------CteConsumer[cteId= ( CTEId#4=] ) +----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq2) ------------------PhysicalDistribute --------------------PhysicalProject -----------------------filter((date_dim.d_year = 1998)) -------------------------PhysicalOlapScan[date_dim] -------------PhysicalDistribute ---------------PhysicalProject -----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq2) -------------------PhysicalProject ---------------------CteConsumer[cteId= ( CTEId#4=] ) +----------------------CteConsumer[cteId= ( CTEId#4=] ) ------------------PhysicalDistribute --------------------PhysicalProject ----------------------filter((date_dim.d_year = 1999)) ------------------------PhysicalOlapScan[date_dim] +------------PhysicalDistribute +--------------PhysicalProject +----------------hashJoin[INNER_JOIN](date_dim.d_week_seq = d_week_seq1) +------------------PhysicalDistribute +--------------------PhysicalProject +----------------------CteConsumer[cteId= ( CTEId#4=] ) +------------------PhysicalDistribute +--------------------PhysicalProject +----------------------filter((date_dim.d_year = 1998)) +------------------------PhysicalOlapScan[date_dim] diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out index c887e96371..56bad7457e 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query24.out @@ -39,15 +39,17 @@ CteAnchor[cteId= ( CTEId#0=] ) ------------hashAgg[GLOBAL] --------------PhysicalDistribute ----------------hashAgg[LOCAL] -------------------PhysicalProject ---------------------filter((cast(i_color as VARCHAR(*)) = 'beige')) -----------------------CteConsumer[cteId= ( CTEId#0=] ) +------------------PhysicalDistribute +--------------------PhysicalProject +----------------------filter((cast(i_color as VARCHAR(*)) = 'beige')) +------------------------CteConsumer[cteId= ( CTEId#0=] ) ------------PhysicalDistribute --------------PhysicalAssertNumRows ----------------PhysicalProject ------------------hashAgg[GLOBAL] --------------------PhysicalDistribute ----------------------hashAgg[LOCAL] -------------------------PhysicalProject ---------------------------CteConsumer[cteId= ( CTEId#0=] ) +------------------------PhysicalDistribute +--------------------------PhysicalProject +----------------------------CteConsumer[cteId= ( CTEId#0=] ) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out index ed1c8ba9cb..7338d341dc 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query30.out @@ -40,6 +40,7 @@ CteAnchor[cteId= ( CTEId#2=] ) ----------------hashAgg[GLOBAL] ------------------PhysicalDistribute --------------------hashAgg[LOCAL] -----------------------PhysicalProject -------------------------CteConsumer[cteId= ( CTEId#2=] ) +----------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------CteConsumer[cteId= ( CTEId#2=] ) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out index 5af21c50d8..49fc532871 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query47.out @@ -35,13 +35,15 @@ CteAnchor[cteId= ( CTEId#0=] ) --------PhysicalTopN ----------PhysicalProject ------------hashJoin[INNER_JOIN](s_store_name = v1_lead.s_store_name)(v1.i_category = v1_lead.i_category)(v1.i_brand = v1_lead.i_brand)(v1.s_company_name = v1_lead.s_company_name)(v1.rn = expr_(rn - 1)) ---------------PhysicalProject -----------------CteConsumer[cteId= ( CTEId#0=] ) +--------------PhysicalDistribute +----------------PhysicalProject +------------------CteConsumer[cteId= ( CTEId#0=] ) --------------PhysicalDistribute ----------------PhysicalProject ------------------hashJoin[INNER_JOIN](s_store_name = v1_lag.s_store_name)(v1.i_category = v1_lag.i_category)(v1.i_brand = v1_lag.i_brand)(v1.s_company_name = v1_lag.s_company_name)(v1.rn = expr_(rn + 1)) ---------------------PhysicalProject -----------------------CteConsumer[cteId= ( CTEId#0=] ) +--------------------PhysicalDistribute +----------------------PhysicalProject +------------------------CteConsumer[cteId= ( CTEId#0=] ) --------------------PhysicalDistribute ----------------------PhysicalProject ------------------------filter((CASE WHEN (avg_monthly_sales > 0.0000) THEN (abs((cast(sum_sales as DOUBLE) - cast(avg_monthly_sales as DOUBLE))) / cast(avg_monthly_sales as DOUBLE)) ELSE NULL END > 0.1)(v2.d_year = 2001)(v2.avg_monthly_sales > 0.0000)) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out index 6138c0d491..1b3a0610a4 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query57.out @@ -35,13 +35,15 @@ CteAnchor[cteId= ( CTEId#0=] ) --------PhysicalTopN ----------PhysicalProject ------------hashJoin[INNER_JOIN](i_brand = v1_lead.i_brand)(v1.i_category = v1_lead.i_category)(v1.cc_name = v1_lead.cc_name)(v1.rn = expr_(rn - 1)) ---------------PhysicalProject -----------------CteConsumer[cteId= ( CTEId#0=] ) +--------------PhysicalDistribute +----------------PhysicalProject +------------------CteConsumer[cteId= ( CTEId#0=] ) --------------PhysicalDistribute ----------------PhysicalProject ------------------hashJoin[INNER_JOIN](i_brand = v1_lag.i_brand)(v1.i_category = v1_lag.i_category)(v1.cc_name = v1_lag.cc_name)(v1.rn = expr_(rn + 1)) ---------------------PhysicalProject -----------------------CteConsumer[cteId= ( CTEId#0=] ) +--------------------PhysicalDistribute +----------------------PhysicalProject +------------------------CteConsumer[cteId= ( CTEId#0=] ) --------------------PhysicalDistribute ----------------------PhysicalProject ------------------------filter((CASE WHEN (avg_monthly_sales > 0.0000) THEN (abs((cast(sum_sales as DOUBLE) - cast(avg_monthly_sales as DOUBLE))) / cast(avg_monthly_sales as DOUBLE)) ELSE NULL END > 0.1)(v2.d_year = 1999)(v2.avg_monthly_sales > 0.0000)) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out index 44ad4b9321..bb72d81784 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query81.out @@ -25,7 +25,8 @@ CteAnchor[cteId= ( CTEId#2=] ) --------PhysicalProject ----------hashJoin[LEFT_SEMI_JOIN](ctr1.ctr_state = ctr2.ctr_state)(cast(ctr_total_return as DOUBLE) > cast((avg(ctr_total_return) * 1.2) as DOUBLE)) ------------hashJoin[INNER_JOIN](ctr1.ctr_customer_sk = customer.c_customer_sk) ---------------CteConsumer[cteId= ( CTEId#2=] ) +--------------PhysicalDistribute +----------------CteConsumer[cteId= ( CTEId#2=] ) --------------PhysicalDistribute ----------------hashJoin[INNER_JOIN](customer_address.ca_address_sk = customer.c_current_addr_sk) ------------------PhysicalProject @@ -39,6 +40,7 @@ CteAnchor[cteId= ( CTEId#2=] ) ----------------hashAgg[GLOBAL] ------------------PhysicalDistribute --------------------hashAgg[LOCAL] -----------------------PhysicalProject -------------------------CteConsumer[cteId= ( CTEId#2=] ) +----------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------CteConsumer[cteId= ( CTEId#2=] ) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out index d6ec3488d9..aa1194fe22 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out @@ -18,19 +18,19 @@ CteAnchor[cteId= ( CTEId#3=] ) ------------hashAgg[LOCAL] --------------PhysicalProject ----------------hashJoin[INNER_JOIN](ws1.ws_ship_date_sk = date_dim.d_date_sk) -------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = web_returns.wr_order_number) +------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = ws_wh.ws_order_number) --------------------PhysicalDistribute ----------------------PhysicalProject +------------------------CteConsumer[cteId= ( CTEId#3=] ) +--------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = web_returns.wr_order_number) +----------------------PhysicalProject ------------------------hashJoin[INNER_JOIN](web_returns.wr_order_number = ws_wh.ws_order_number) ---------------------------PhysicalProject -----------------------------CteConsumer[cteId= ( CTEId#3=] ) +--------------------------PhysicalDistribute +----------------------------PhysicalProject +------------------------------CteConsumer[cteId= ( CTEId#3=] ) --------------------------PhysicalDistribute ----------------------------PhysicalProject ------------------------------PhysicalOlapScan[web_returns] ---------------------hashJoin[RIGHT_SEMI_JOIN](ws1.ws_order_number = ws_wh.ws_order_number) -----------------------PhysicalDistribute -------------------------PhysicalProject ---------------------------CteConsumer[cteId= ( CTEId#3=] ) ----------------------PhysicalDistribute ------------------------hashJoin[INNER_JOIN](ws1.ws_web_site_sk = web_site.web_site_sk) --------------------------hashJoin[INNER_JOIN](ws1.ws_ship_addr_sk = customer_address.ca_address_sk)