From df2b1905594be71f2dd987b7f20369eec6ccd97f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 7 Dec 2022 08:47:35 +0800 Subject: [PATCH] [pipeline](dategen) Support datagen node (#14835) --- be/src/pipeline/CMakeLists.txt | 1 + be/src/pipeline/exec/datagen_operator.cpp | 38 +++++++++++++++ be/src/pipeline/exec/datagen_operator.h | 48 +++++++++++++++++++ be/src/pipeline/exec/operator.h | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 7 +++ 5 files changed, 95 insertions(+) create mode 100644 be/src/pipeline/exec/datagen_operator.cpp create mode 100644 be/src/pipeline/exec/datagen_operator.h diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 436d4e70f2..c3a06c9f1f 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -28,6 +28,7 @@ set(PIPELINE_FILES task_scheduler.cpp exec/operator.cpp exec/scan_operator.cpp + exec/datagen_operator.cpp exec/empty_set_operator.cpp exec/exchange_source_operator.cpp exec/exchange_sink_operator.cpp diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp new file mode 100644 index 0000000000..729096f803 --- /dev/null +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "datagen_operator.h" + +#include "vec/exec/vdata_gen_scan_node.h" + +namespace doris::pipeline { + +OPERATOR_CODE_GENERATOR(DataGenOperator, Operator) + +Status DataGenOperator::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(Operator::open(state)); + return _node->open(state); +} + +Status DataGenOperator::close(RuntimeState* state) { + RETURN_IF_ERROR(Operator::close(state)); + _node->close(state); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h new file mode 100644 index 0000000000..9d418a0284 --- /dev/null +++ b/be/src/pipeline/exec/datagen_operator.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "operator.h" + +namespace doris::vectorized { +class VDataGenFunctionScanNode; +} // namespace doris::vectorized + +namespace doris::pipeline { + +class DataGenOperatorBuilder : public OperatorBuilder { +public: + DataGenOperatorBuilder(int32_t id, ExecNode* exec_node); + bool is_source() const override { return true; } + OperatorPtr build_operator() override; +}; + +class DataGenOperator : public Operator { +public: + DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* datagen_node); + + bool can_read() override { return true; }; + + Status open(RuntimeState* state) override; + + Status close(RuntimeState* state) override; +}; + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d821103aa1..461848d54a 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -23,6 +23,7 @@ #include "exec/exec_node.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" +#include "vec/exec/vdata_gen_scan_node.h" #define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \ NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \ diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0c2793c444..75cd7dbc3e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -23,6 +23,7 @@ #include "exec/aggregation_sink_operator.h" #include "exec/aggregation_source_operator.h" #include "exec/data_sink.h" +#include "exec/datagen_operator.h" #include "exec/empty_set_operator.h" #include "exec/exchange_sink_operator.h" #include "exec/exchange_source_operator.h" @@ -298,6 +299,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } + case TPlanNodeType::DATA_GEN_SCAN_NODE: { + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); + RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + break; + } case TPlanNodeType::AGGREGATION_NODE: { auto* agg_node = assert_cast(node); auto new_pipe = add_pipeline();