[enhancement](pipeline) support jdbc scan, jdbc sink, odbc scan and odbc sink node for pipeline. (#14986)

support jdbc scan, jdbc sink, odbc scan and odbc sink node for pipeline.
This commit is contained in:
luozenglin
2022-12-12 17:03:22 +08:00
committed by GitHub
parent 0a16f11435
commit affc57bee7
2 changed files with 17 additions and 14 deletions

View File

@ -18,31 +18,31 @@
#pragma once
#include "operator.h"
#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vtable_sink.h"
namespace doris {
namespace pipeline {
class MysqlTableSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::VMysqlTableSink> {
// used for VMysqlTableSink, VJdbcTableSink and VOdbcTableSink.
class TableSinkOperatorBuilder final : public DataSinkOperatorBuilder<vectorized::VTableSink> {
public:
MysqlTableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "MysqlTableSinkOperator", sink) {};
TableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "TableSinkOperator", sink) {};
OperatorPtr build_operator() override;
};
class MysqlTableSinkOperator final : public DataSinkOperator<MysqlTableSinkOperatorBuilder> {
class TableSinkOperator final : public DataSinkOperator<TableSinkOperatorBuilder> {
public:
MysqlTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {};
bool can_write() override { return true; }
};
OperatorPtr MysqlTableSinkOperatorBuilder::build_operator() {
return std::make_shared<MysqlTableSinkOperator>(this, _sink);
OperatorPtr TableSinkOperatorBuilder::build_operator() {
return std::make_shared<TableSinkOperator>(this, _sink);
}
} // namespace pipeline

View File

@ -32,7 +32,6 @@
#include "exec/hashjoin_build_sink.h"
#include "exec/hashjoin_probe_operator.h"
#include "exec/mysql_scan_operator.h"
#include "exec/mysql_table_sink_operator.h"
#include "exec/repeat_operator.h"
#include "exec/result_sink_operator.h"
#include "exec/scan_node.h"
@ -46,6 +45,7 @@
#include "exec/sort_source_operator.h"
#include "exec/streaming_aggregation_sink_operator.h"
#include "exec/streaming_aggregation_source_operator.h"
#include "exec/table_sink_operator.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "pipeline/exec/assert_num_rows_operator.h"
@ -301,7 +301,9 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto node_type = node->type();
switch (node_type) {
// for source
case TPlanNodeType::OLAP_SCAN_NODE: {
case TPlanNodeType::OLAP_SCAN_NODE:
case TPlanNodeType::JDBC_SCAN_NODE:
case TPlanNodeType::ODBC_SCAN_NODE: {
OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(
fragment_context->next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
@ -528,9 +530,10 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
_sink.get());
break;
}
case TDataSinkType::MYSQL_TABLE_SINK: {
sink_ = std::make_shared<MysqlTableSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
case TDataSinkType::MYSQL_TABLE_SINK:
case TDataSinkType::JDBC_TABLE_SINK:
case TDataSinkType::ODBC_TABLE_SINK: {
sink_ = std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), _sink.get());
break;
}
default: