diff --git a/be/src/pipeline/exec/mysql_table_sink_operator.h b/be/src/pipeline/exec/table_sink_operator.h similarity index 64% rename from be/src/pipeline/exec/mysql_table_sink_operator.h rename to be/src/pipeline/exec/table_sink_operator.h index 8b6392b5d5..2e7b13033c 100644 --- a/be/src/pipeline/exec/mysql_table_sink_operator.h +++ b/be/src/pipeline/exec/table_sink_operator.h @@ -18,31 +18,31 @@ #pragma once #include "operator.h" -#include "vec/sink/vmysql_table_sink.h" +#include "vec/sink/vtable_sink.h" namespace doris { namespace pipeline { -class MysqlTableSinkOperatorBuilder final - : public DataSinkOperatorBuilder { +// used for VMysqlTableSink, VJdbcTableSink and VOdbcTableSink. +class TableSinkOperatorBuilder final : public DataSinkOperatorBuilder { public: - MysqlTableSinkOperatorBuilder(int32_t id, DataSink* sink) - : DataSinkOperatorBuilder(id, "MysqlTableSinkOperator", sink) {}; + TableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "TableSinkOperator", sink) {}; OperatorPtr build_operator() override; }; -class MysqlTableSinkOperator final : public DataSinkOperator { +class TableSinkOperator final : public DataSinkOperator { public: - MysqlTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {}; bool can_write() override { return true; } }; -OperatorPtr MysqlTableSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _sink); +OperatorPtr TableSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); } } // namespace pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f88fd6b931..a034b2a9a0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -32,7 +32,6 @@ #include "exec/hashjoin_build_sink.h" #include "exec/hashjoin_probe_operator.h" #include "exec/mysql_scan_operator.h" -#include "exec/mysql_table_sink_operator.h" #include "exec/repeat_operator.h" #include "exec/result_sink_operator.h" #include "exec/scan_node.h" @@ -46,6 +45,7 @@ #include "exec/sort_source_operator.h" #include "exec/streaming_aggregation_sink_operator.h" #include "exec/streaming_aggregation_source_operator.h" +#include "exec/table_sink_operator.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" #include "pipeline/exec/assert_num_rows_operator.h" @@ -301,7 +301,9 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto node_type = node->type(); switch (node_type) { // for source - case TPlanNodeType::OLAP_SCAN_NODE: { + case TPlanNodeType::OLAP_SCAN_NODE: + case TPlanNodeType::JDBC_SCAN_NODE: + case TPlanNodeType::ODBC_SCAN_NODE: { OperatorBuilderPtr operator_t = std::make_shared( fragment_context->next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); @@ -528,9 +530,10 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { _sink.get()); break; } - case TDataSinkType::MYSQL_TABLE_SINK: { - sink_ = std::make_shared(next_operator_builder_id(), - _sink.get()); + case TDataSinkType::MYSQL_TABLE_SINK: + case TDataSinkType::JDBC_TABLE_SINK: + case TDataSinkType::ODBC_TABLE_SINK: { + sink_ = std::make_shared(next_operator_builder_id(), _sink.get()); break; } default: