[Feature](vectorized) support jdbc sink for insert into data to table (#12534)

This commit is contained in:
zhangstar333
2022-09-15 11:08:41 +08:00
committed by GitHub
parent 33f5a86e69
commit 22a8d35999
32 changed files with 1151 additions and 491 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
@ -201,6 +202,13 @@ public class DescribeStmt extends ShowStmt {
odbcTable.getOdbcDriver(),
odbcTable.getOdbcTableTypeName());
totalRows.add(row);
} else if (table.getType() == TableType.JDBC) {
isOlapTable = false;
JdbcTable jdbcTable = (JdbcTable) table;
List<String> row = Arrays.asList(jdbcTable.getJdbcUrl(), jdbcTable.getJdbcUser(),
jdbcTable.getJdbcPasswd(), jdbcTable.getDriverClass(), jdbcTable.getDriverUrl(),
jdbcTable.getExternalTableName(), jdbcTable.getResourceName(), jdbcTable.getJdbcTypeName());
totalRows.add(row);
} else if (table.getType() == TableType.MYSQL) {
isOlapTable = false;
MysqlTable mysqlTable = (MysqlTable) table;

View File

@ -207,6 +207,7 @@ public class ExportStmt extends StatementBase {
switch (tblType) {
case MYSQL:
case ODBC:
case JDBC:
case OLAP:
break;
case BROKER:

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
@ -355,7 +356,8 @@ public class InsertStmt extends DdlStmt {
}
// will use it during create load job
indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash();
} else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable) {
} else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable
|| targetTable instanceof JdbcTable) {
if (targetPartitionNames != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}

View File

@ -55,13 +55,13 @@ import java.util.Map;
* DROP RESOURCE "jdbc_mysql";
*/
public class JdbcResource extends Resource {
public static final String URL = "jdbc_url";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DRIVER_CLASS = "driver_class";
public static final String DRIVER_URL = "driver_url";
public static final String TYPE = "type";
public static final String CHECK_SUM = "checksum";
private static final String URL = "jdbc_url";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String DRIVER_CLASS = "driver_class";
private static final String DRIVER_URL = "driver_url";
private static final String TYPE = "type";
private static final String CHECK_SUM = "checksum";
private static final Logger LOG = LogManager.getLogger(JdbcResource.class);
// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;

View File

@ -28,6 +28,7 @@ import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -46,13 +47,24 @@ public class JdbcTable extends Table {
private static final String TABLE = "table";
private static final String RESOURCE = "resource";
private static final String TABLE_TYPE = "table_type";
// TODO: We may have to support various types of databases like ODBC
// map now jdbc external table Doris support now
private static final String URL = "jdbc_url";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String DRIVER_CLASS = "driver_class";
private static final String DRIVER_URL = "driver_url";
private static final String CHECK_SUM = "checksum";
private static Map<String, TOdbcTableType> TABLE_TYPE_MAP;
private String resourceName;
private String externalTableName;
private String jdbcTypeName;
private String jdbcUrl;
private String jdbcUser;
private String jdbcPasswd;
private String driverClass;
private String driverUrl;
private String checkSum;
static {
Map<String, TOdbcTableType> tempMap = new HashMap<>();
tempMap.put("mysql", TOdbcTableType.MYSQL);
@ -72,18 +84,49 @@ public class JdbcTable extends Table {
validate(properties);
}
public String getCheckSum() {
return checkSum;
}
public String getExternalTableName() {
return externalTableName;
}
public String getJdbcTypeName() {
return jdbcTypeName;
}
public String getJdbcUrl() {
return jdbcUrl;
}
public String getJdbcUser() {
return jdbcUser;
}
public String getJdbcPasswd() {
return jdbcPasswd;
}
public String getDriverClass() {
return driverClass;
}
public String getDriverUrl() {
return driverUrl;
}
@Override
public TTableDescriptor toThrift() {
TJdbcTable tJdbcTable = new TJdbcTable();
JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName));
tJdbcTable.setJdbcUrl(jdbcResource.getProperty(JdbcResource.URL));
tJdbcTable.setJdbcUser(jdbcResource.getProperty(JdbcResource.USER));
tJdbcTable.setJdbcPassword(jdbcResource.getProperty(JdbcResource.PASSWORD));
tJdbcTable.setJdbcUrl(jdbcUrl);
tJdbcTable.setJdbcUser(jdbcUser);
tJdbcTable.setJdbcPassword(jdbcPasswd);
tJdbcTable.setJdbcTableName(externalTableName);
tJdbcTable.setJdbcDriverClass(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS));
tJdbcTable.setJdbcDriverUrl(jdbcResource.getProperty(JdbcResource.DRIVER_URL));
tJdbcTable.setJdbcResourceName(jdbcResource.getName());
tJdbcTable.setJdbcDriverChecksum(jdbcResource.getProperty(JdbcResource.CHECK_SUM));
tJdbcTable.setJdbcDriverClass(driverClass);
tJdbcTable.setJdbcDriverUrl(driverUrl);
tJdbcTable.setJdbcResourceName(resourceName);
tJdbcTable.setJdbcDriverChecksum(checkSum);
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0,
getName(), "");
@ -94,17 +137,50 @@ public class JdbcTable extends Table {
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, externalTableName);
Text.writeString(out, resourceName);
Text.writeString(out, jdbcTypeName);
Map<String, String> serializeMap = Maps.newHashMap();
serializeMap.put(TABLE, externalTableName);
serializeMap.put(RESOURCE, resourceName);
serializeMap.put(TABLE_TYPE, jdbcTypeName);
serializeMap.put(URL, jdbcUrl);
serializeMap.put(USER, jdbcUser);
serializeMap.put(PASSWORD, jdbcPasswd);
serializeMap.put(DRIVER_CLASS, driverClass);
serializeMap.put(DRIVER_URL, driverUrl);
serializeMap.put(CHECK_SUM, checkSum);
int size = (int) serializeMap.values().stream().filter(v -> {
return v != null;
}).count();
out.writeInt(size);
for (Map.Entry<String, String> kv : serializeMap.entrySet()) {
if (kv.getValue() != null) {
Text.writeString(out, kv.getKey());
Text.writeString(out, kv.getValue());
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
externalTableName = Text.readString(in);
resourceName = Text.readString(in);
jdbcTypeName = Text.readString(in);
int size = in.readInt();
Map<String, String> serializeMap = Maps.newHashMap();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
serializeMap.put(key, value);
}
externalTableName = serializeMap.get(TABLE);
resourceName = serializeMap.get(RESOURCE);
jdbcTypeName = serializeMap.get(TABLE_TYPE);
jdbcUrl = serializeMap.get(URL);
jdbcUser = serializeMap.get(USER);
jdbcPasswd = serializeMap.get(PASSWORD);
driverClass = serializeMap.get(DRIVER_CLASS);
driverUrl = serializeMap.get(DRIVER_URL);
checkSum = serializeMap.get(CHECK_SUM);
}
public String getResourceName() {
@ -125,18 +201,17 @@ public class JdbcTable extends Table {
@Override
public String getSignature(int signatureVersion) {
JdbcResource jdbcResource = (JdbcResource) (Env.getCurrentEnv().getResourceMgr().getResource(resourceName));
StringBuilder sb = new StringBuilder(signatureVersion);
sb.append(name);
sb.append(type);
sb.append(resourceName);
sb.append(externalTableName);
sb.append(jdbcTypeName);
sb.append(jdbcResource.getProperty(JdbcResource.URL));
sb.append(jdbcResource.getProperty(JdbcResource.USER));
sb.append(jdbcResource.getProperty(JdbcResource.PASSWORD));
sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_CLASS));
sb.append(jdbcResource.getProperty(JdbcResource.DRIVER_URL));
sb.append(jdbcUrl);
sb.append(jdbcUser);
sb.append(jdbcPasswd);
sb.append(driverClass);
sb.append(driverUrl);
sb.append(checkSum);
String md5 = DigestUtils.md5Hex(sb.toString());
LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString());
@ -181,5 +256,13 @@ public class JdbcTable extends Table {
if (resource.getType() != ResourceType.JDBC) {
throw new DdlException("resource [" + resourceName + "] is not jdbc resource");
}
JdbcResource jdbcResource = (JdbcResource) resource;
jdbcUrl = jdbcResource.getProperty(URL);
jdbcUser = jdbcResource.getProperty(USER);
jdbcPasswd = jdbcResource.getProperty(PASSWORD);
driverClass = jdbcResource.getProperty(DRIVER_CLASS);
driverUrl = jdbcResource.getProperty(DRIVER_URL);
checkSum = jdbcResource.getProperty(CHECK_SUM);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.PrimitiveType;
@ -55,6 +56,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.ExportSink;
import org.apache.doris.planner.JdbcScanNode;
import org.apache.doris.planner.MysqlScanNode;
import org.apache.doris.planner.OdbcScanNode;
import org.apache.doris.planner.OlapScanNode;
@ -417,6 +419,9 @@ public class ExportJob implements Writable {
case MYSQL:
scanNode = new MysqlScanNode(new PlanNodeId(0), exportTupleDesc, (MysqlTable) this.exportTable);
break;
case JDBC:
scanNode = new JdbcScanNode(new PlanNodeId(0), exportTupleDesc, (JdbcTable) this.exportTable);
break;
default:
break;
}
@ -446,6 +451,7 @@ public class ExportJob implements Writable {
new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.RANDOM);
break;
case ODBC:
case JDBC:
case MYSQL:
fragment = new PlanFragment(
new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.UNPARTITIONED);

View File

@ -20,6 +20,7 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.Table;
@ -65,6 +66,8 @@ public abstract class DataSink {
return new MysqlTableSink((MysqlTable) table);
} else if (table instanceof OdbcTable) {
return new OdbcTableSink((OdbcTable) table);
} else if (table instanceof JdbcTable) {
return new JdbcTableSink((JdbcTable) table);
} else {
throw new AnalysisException("Unknown table type " + table.getType());
}

View File

@ -271,7 +271,7 @@ public class DistributedPlanner {
* TODO: hbase scans are range-partitioned on the row key
*/
private PlanFragment createScanFragment(PlanNode node) throws UserException {
if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) {
if (node instanceof MysqlScanNode || node instanceof OdbcScanNode || node instanceof JdbcScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM);

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TJdbcTableSink;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class JdbcTableSink extends DataSink {
private static final Logger LOG = LogManager.getLogger(JdbcTableSink.class);
private final String resourceName;
private final String externalTableName;
private final String dorisTableName;
private final String jdbcUrl;
private final String jdbcUser;
private final String jdbcPasswd;
private final String driverClass;
private final String driverUrl;
private final String checkSum;
private final TOdbcTableType jdbcType;
private final boolean useTransaction;
public JdbcTableSink(JdbcTable jdbcTable) {
resourceName = jdbcTable.getResourceName();
jdbcType = jdbcTable.getJdbcTableType();
externalTableName = OdbcTable.databaseProperName(jdbcType, jdbcTable.getExternalTableName());
useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
jdbcUrl = jdbcTable.getJdbcUrl();
jdbcUser = jdbcTable.getJdbcUser();
jdbcPasswd = jdbcTable.getJdbcPasswd();
driverClass = jdbcTable.getDriverClass();
driverUrl = jdbcTable.getDriverUrl();
checkSum = jdbcTable.getCheckSum();
dorisTableName = jdbcTable.getName();
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix + "JDBC TABLE SINK:\n");
strBuilder.append(prefix + "TABLENAME IN DORIS: ").append(dorisTableName).append("\n");
strBuilder.append(prefix + "TABLE TYPE: ").append(jdbcType.toString()).append("\n");
strBuilder.append(prefix + "TABLENAME OF EXTERNAL TABLE: ").append(externalTableName).append("\n");
strBuilder.append(prefix + "EnableTransaction: ").append(useTransaction ? "true" : "false").append("\n");
return strBuilder.toString();
}
@Override
protected TDataSink toThrift() {
TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK);
TJdbcTableSink jdbcTableSink = new TJdbcTableSink();
TJdbcTable jdbcTable = new TJdbcTable();
jdbcTableSink.setJdbcTable(jdbcTable);
jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl);
jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser);
jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd);
jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName);
jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl);
jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
jdbcTableSink.setUseTransaction(useTransaction);
tDataSink.setJdbcTableSink(jdbcTableSink);
return tDataSink;
}
@Override
public PlanNodeId getExchNodeId() {
return null;
}
@Override
public DataPartition getOutputPartition() {
return null;
}
}