From 8d17eea22b0be861b7c464e53c348fa2ecce88dc Mon Sep 17 00:00:00 2001 From: luozenglin <37725793+luozenglin@users.noreply.github.com> Date: Fri, 9 Dec 2022 16:25:29 +0800 Subject: [PATCH] [pipeline](mysqlscan) support mysql scan node (#14949) --- be/src/pipeline/CMakeLists.txt | 1 + be/src/pipeline/exec/mysql_scan_operator.cpp | 38 +++++++++++++++ be/src/pipeline/exec/mysql_scan_operator.h | 48 +++++++++++++++++++ be/src/pipeline/exec/operator.h | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 7 +++ 5 files changed, 95 insertions(+) create mode 100644 be/src/pipeline/exec/mysql_scan_operator.cpp create mode 100644 be/src/pipeline/exec/mysql_scan_operator.h diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 659b357870..a9cadd12b0 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -28,6 +28,7 @@ set(PIPELINE_FILES task_scheduler.cpp exec/operator.cpp exec/scan_operator.cpp + exec/mysql_scan_operator.cpp exec/datagen_operator.cpp exec/empty_set_operator.cpp exec/exchange_source_operator.cpp diff --git a/be/src/pipeline/exec/mysql_scan_operator.cpp b/be/src/pipeline/exec/mysql_scan_operator.cpp new file mode 100644 index 0000000000..575d272c33 --- /dev/null +++ b/be/src/pipeline/exec/mysql_scan_operator.cpp @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "mysql_scan_operator.h" + +#include "vec/exec/vmysql_scan_node.h" + +namespace doris::pipeline { + +OPERATOR_CODE_GENERATOR(MysqlScanOperator, Operator) + +Status MysqlScanOperator::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(Operator::open(state)); + return _node->open(state); +} + +Status MysqlScanOperator::close(RuntimeState* state) { + RETURN_IF_ERROR(Operator::close(state)); + _node->close(state); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/mysql_scan_operator.h b/be/src/pipeline/exec/mysql_scan_operator.h new file mode 100644 index 0000000000..8363077790 --- /dev/null +++ b/be/src/pipeline/exec/mysql_scan_operator.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "operator.h" + +namespace doris::vectorized { +class VMysqlScanNode; +} // namespace doris::vectorized + +namespace doris::pipeline { + +class MysqlScanOperatorBuilder : public OperatorBuilder { +public: + MysqlScanOperatorBuilder(int32_t id, ExecNode* exec_node); + bool is_source() const override { return true; } + OperatorPtr build_operator() override; +}; + +class MysqlScanOperator : public Operator { +public: + MysqlScanOperator(OperatorBuilderBase* operator_builder, ExecNode* mysql_scan_node); + + bool can_read() override { return true; }; + + Status open(RuntimeState* state) override; + + Status close(RuntimeState* state) override; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index a17e8dd1f4..d3d7e18c8b 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -24,6 +24,7 @@ #include "runtime/runtime_state.h" #include "vec/core/block.h" #include "vec/exec/vdata_gen_scan_node.h" +#include "vec/exec/vmysql_scan_node.h" #define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \ NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \ diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f2deb1375a..921ec10403 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -29,6 +29,7 @@ #include "exec/exchange_source_operator.h" #include "exec/hashjoin_build_sink.h" #include "exec/hashjoin_probe_operator.h" +#include "exec/mysql_scan_operator.h" #include "exec/repeat_operator.h" #include "exec/result_sink_operator.h" #include "exec/scan_node.h" @@ -294,6 +295,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } + case TPlanNodeType::MYSQL_SCAN_NODE: { + OperatorBuilderPtr operator_t = + std::make_shared(next_operator_builder_id(), node); + RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + break; + } case TPlanNodeType::EXCHANGE_NODE: { OperatorBuilderPtr operator_t = std::make_shared(next_operator_builder_id(), node);