[feature](load-refactor) Step1: InsertStmt as facade layer and run S3/Broker Load (#19142)

This commit is contained in:
奕冷
2023-05-10 17:48:50 +08:00
committed by GitHub
parent d20b5f90d8
commit 894801f5ce
27 changed files with 1901 additions and 752 deletions

View File

@ -101,8 +101,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
}
public static BrokerDesc createForStreamLoad() {
BrokerDesc brokerDesc = new BrokerDesc("", StorageType.STREAM, null);
return brokerDesc;
return new BrokerDesc("", StorageType.STREAM, null);
}
public boolean isMultiLoadBroker() {

View File

@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Map;
public class BrokerLoadStmt extends InsertStmt {
private final List<DataDescription> dataDescList;
private final BrokerDesc brokerDesc;
private String cluster;
public BrokerLoadStmt(LabelName label, List<DataDescription> dataDescList, BrokerDesc brokerDesc,
Map<String, String> properties, String comments) {
this.label = label;
this.dataDescList = dataDescList;
this.brokerDesc = brokerDesc;
this.properties = properties;
if (comments != null) {
this.comments = comments;
} else {
this.comments = "";
}
}
@Override
public List<DataDescription> getDataDescList() {
return dataDescList;
}
@Override
public BrokerDesc getResourceDesc() {
return brokerDesc;
}
@Override
public LoadType getLoadType() {
return LoadType.BROKER_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
// public check should be in base class
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
label.analyze(analyzer);
Preconditions.checkState(!CollectionUtils.isEmpty(dataDescList),
new AnalysisException("No data file in load statement."));
Preconditions.checkNotNull(brokerDesc, "No broker desc found.");
// check data descriptions
for (DataDescription dataDescription : dataDescList) {
final String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
Preconditions.checkState(!dataDescription.isLoadFromTable(),
new AnalysisException("Load from table should use Spark Load"));
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
if (!brokerDesc.isMultiLoadBroker()) {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
dataDescription.getFilePaths().set(i,
ExportStmt.checkPath(dataDescription.getFilePaths().get(i), brokerDesc.getStorageType()));
}
}
}
}
@Override
public boolean needAuditEncryption() {
return true;
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("LOAD LABEL ").append(label.toSql()).append("\n");
sb.append("(");
Joiner.on(",\n").appendTo(sb, Lists.transform(dataDescList, DataDesc::toSql)).append(")");
if (cluster != null) {
sb.append("\nBY '");
sb.append(cluster);
sb.append("'");
}
if (brokerDesc != null) {
sb.append("\n").append(brokerDesc.toSql());
}
if (properties != null && !properties.isEmpty()) {
sb.append("\nPROPERTIES (");
sb.append(new PrintableMap<>(properties, "=", true, false));
sb.append(")");
}
return sb.toString();
}
}

View File

@ -51,7 +51,7 @@ public class CreateTableAsSelectStmt extends DdlStmt {
this.createTableStmt = createTableStmt;
this.columnNames = columnNames;
this.queryStmt = queryStmt;
this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt);
this.insertStmt = new NativeInsertStmt(createTableStmt.getDbTbl(), queryStmt);
}
/**

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
@ -81,7 +82,7 @@ import java.util.TreeSet;
* The transform after the keyword named SET is the old ways which only supports the hadoop function.
* It old way of transform will be removed gradually. It
*/
public class DataDescription {
public class DataDescription implements InsertStmt.DataDesc {
private static final Logger LOG = LogManager.getLogger(DataDescription.class);
// function isn't built-in function, hll_hash is not built-in function in hadoop load.
private static final List<String> HADOOP_SUPPORT_FUNCTION_NAMES = Arrays.asList(
@ -146,7 +147,7 @@ public class DataDescription {
private boolean isHadoopLoad = false;
private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
private final LoadTask.MergeType mergeType;
private final Expr deleteCondition;
private final Map<String, String> properties;
private boolean trimDoubleQuotes = false;
@ -1153,6 +1154,17 @@ public class DataDescription {
return sb.toString();
}
public void checkKeyTypeForLoad(OlapTable table) throws AnalysisException {
if (getMergeType() != LoadTask.MergeType.APPEND) {
if (table.getKeysType() != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables.");
} else if (!table.hasDeleteSign()) {
throw new AnalysisException(
"load by MERGE or DELETE need to upgrade table to support batch delete.");
}
}
}
@Override
public String toString() {
return toSql();

View File

@ -167,7 +167,7 @@ public class DeleteStmt extends DdlStmt {
LimitElement.NO_LIMIT
);
insertStmt = new InsertStmt(
insertStmt = new NativeInsertStmt(
new InsertTarget(tableName, null),
null,
cols,

View File

@ -32,6 +32,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// TODO(tsy): maybe better to rename as `LoadLabel`
// label name used to identify a load job
public class LabelName implements Writable {
private String dbName;

View File

@ -109,8 +109,8 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_IN_PARAM_STRICT_MODE = "strict_mode";
public static final String KEY_IN_PARAM_TIMEZONE = "timezone";
public static final String KEY_IN_PARAM_EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths";
public static final String KEY_IN_PARAM_JSONROOT = "json_root";
public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths";
public static final String KEY_IN_PARAM_JSONROOT = "json_root";
public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array";
public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string";
@ -273,6 +273,7 @@ public class LoadStmt extends DdlStmt {
return brokerDesc;
}
@Deprecated
public String getCluster() {
return cluster;
}
@ -285,6 +286,7 @@ public class LoadStmt extends DdlStmt {
return properties;
}
@Deprecated
public String getUser() {
return user;
}

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.load.EtlJobType;
import com.google.common.base.Preconditions;
import java.util.EnumMap;
public enum LoadType {
UNKNOWN,
NATIVE_INSERT,
BROKER_LOAD,
SPARK_LOAD,
MYSQL_LOAD,
ROUTINE_LOAD,
STREAM_LOAD;
private static final EnumMap<LoadType, EtlJobType> LOAD_TYPE_TO_ETL_TYPE = new EnumMap<>(LoadType.class);
static {
LOAD_TYPE_TO_ETL_TYPE.put(NATIVE_INSERT, EtlJobType.INSERT);
LOAD_TYPE_TO_ETL_TYPE.put(BROKER_LOAD, EtlJobType.BROKER);
LOAD_TYPE_TO_ETL_TYPE.put(SPARK_LOAD, EtlJobType.SPARK);
LOAD_TYPE_TO_ETL_TYPE.put(MYSQL_LOAD, EtlJobType.LOCAL_FILE);
// TODO(tsy): add routine load and stream load
}
public static EtlJobType getEtlJobType(LoadType loadType) {
return Preconditions.checkNotNull(LOAD_TYPE_TO_ETL_TYPE.get(loadType));
}
}

View File

@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java
// and modified by Doris
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class MysqlLoadStmt extends InsertStmt {
private DataDescription dataDescription;
@Override
public List<? extends DataDesc> getDataDescList() {
return Collections.singletonList(dataDescription);
}
@Override
public ResourceDesc getResourceDesc() {
// mysql load does not have resource desc
return null;
}
@Override
public LoadType getLoadType() {
return LoadType.MYSQL_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
if (!dataDescription.isClientLocal()) {
for (String path : dataDescription.getFilePaths()) {
if (Config.mysql_load_server_secure_path.isEmpty()) {
throw new AnalysisException("Load local data from fe local is not enabled. If you want to use it,"
+ " plz set the `mysql_load_server_secure_path` for FE to be a right path.");
} else {
File file = new File(path);
try {
if (!(file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path))) {
throw new AnalysisException("Local file should be under the secure path of FE.");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!file.exists()) {
throw new AnalysisException("File: " + path + " is not exists.");
}
}
}
}
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.NO_FORWARD;
}
}

View File

@ -0,0 +1,829 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExportSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Insert into is performed to load data from the result of query stmt.
* <p>
* syntax:
* INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt
* <p>
* table_name: is the name of target table
* partition_info: PARTITION (p1,p2)
* the partition info of target table
* col_list: (c1,c2)
* the column list of target table
* plan_hints: [STREAMING,SHUFFLE_HINT]
* The streaming plan is used by both streaming and non-streaming insert stmt.
* The only difference is that non-streaming will record the load info in LoadManager and return label.
* User can check the load info by show load stmt.
*/
public class NativeInsertStmt extends InsertStmt {
private static final Logger LOG = LogManager.getLogger(InsertStmt.class);
private static final String SHUFFLE_HINT = "SHUFFLE";
private static final String NOSHUFFLE_HINT = "NOSHUFFLE";
private final TableName tblName;
private final PartitionNames targetPartitionNames;
// parsed from targetPartitionNames.
private List<Long> targetPartitionIds;
private final List<String> targetColumnNames;
private QueryStmt queryStmt;
private final List<String> planHints;
private Boolean isRepartition;
// set after parse all columns and expr in query statement
// this result expr in the order of target table's columns
private final List<Expr> resultExprs = Lists.newArrayList();
private final Map<String, Expr> exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
private Table targetTable;
private DatabaseIf db;
private long transactionId;
// we need a new TupleDesc for olap table.
private TupleDescriptor olapTuple;
private DataSink dataSink;
private DataPartition dataPartition;
private final List<Column> targetColumns = Lists.newArrayList();
/*
* InsertStmt may be analyzed twice, but transaction must be only begun once.
* So use a boolean to check if transaction already begun.
*/
private boolean isTransactionBegin = false;
private boolean isValuesOrConstantSelect;
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
this.tblName = target.getTblName();
this.targetPartitionNames = target.getPartitionNames();
this.label = new LabelName(null, label);
this.queryStmt = source.getQueryStmt();
this.planHints = hints;
this.targetColumnNames = cols;
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
// Ctor for CreateTableAsSelectStmt
public NativeInsertStmt(TableName name, QueryStmt queryStmt) {
this.tblName = name;
this.targetPartitionNames = null;
this.targetColumnNames = null;
this.queryStmt = queryStmt;
this.planHints = null;
}
public boolean isValuesOrConstantSelect() {
return isValuesOrConstantSelect;
}
public Table getTargetTable() {
return targetTable;
}
public void setTargetTable(Table targetTable) {
this.targetTable = targetTable;
}
public long getTransactionId() {
return this.transactionId;
}
public Boolean isRepartition() {
return isRepartition;
}
public String getDbName() {
return tblName.getDb();
}
public String getTbl() {
return tblName.getTbl();
}
public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet)
throws AnalysisException {
// get dbs of statement
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
tblName.analyze(analyzer);
// disallow external catalog except JdbcExternalCatalog
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
}
String dbName = tblName.getDb();
String tableName = tblName.getTbl();
// check exist
DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
// check access
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbName, tableName, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
dbName + ": " + tableName);
}
tableMap.put(table.getId(), table);
}
public QueryStmt getQueryStmt() {
return queryStmt;
}
public void setQueryStmt(QueryStmt queryStmt) {
this.queryStmt = queryStmt;
}
public boolean isExplain() {
return queryStmt.isExplain();
}
public String getLabel() {
return label.getLabelName();
}
public DataSink getDataSink() {
return dataSink;
}
public DatabaseIf getDbObj() {
return db;
}
public boolean isTransactionBegin() {
return isTransactionBegin;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (targetTable == null) {
tblName.analyze(analyzer);
// disallow external catalog except JdbcExternalCatalog
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
}
}
// Check privilege
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tblName.getDb(),
tblName.getTbl(), PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), tblName.getDb() + ": " + tblName.getTbl());
}
// check partition
if (targetPartitionNames != null) {
targetPartitionNames.analyze(analyzer);
}
// set target table and
analyzeTargetTable(analyzer);
analyzeSubquery(analyzer);
analyzePlanHints();
if (analyzer.getContext().isTxnModel()) {
return;
}
// create data sink
createDataSink();
db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
long timeoutSecond = ConnectContext.get().getExecTimeout();
if (label == null || Strings.isNullOrEmpty(label.getLabelName())) {
label = new LabelName(db.getFullName(),
"insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"));
}
if (!isExplain() && !isTransactionBegin) {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
}
// init data sink
if (!isExplain() && targetTable instanceof OlapTable) {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = analyzer.getContext().queryId();
int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism();
sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism, false);
}
}
private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
// Get table
if (targetTable == null) {
DatabaseIf db = analyzer.getEnv().getCatalogMgr()
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
if (db instanceof Database) {
targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl());
} else if (db instanceof JdbcExternalDatabase) {
JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl());
targetTable = jdbcTable.getJdbcTable();
} else {
throw new AnalysisException("Not support insert target table.");
}
}
if (targetTable instanceof OlapTable) {
OlapTable olapTable = (OlapTable) targetTable;
// partition
if (targetPartitionNames != null) {
targetPartitionIds = Lists.newArrayList();
if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
for (String partName : targetPartitionNames.getPartitionNames()) {
Partition part = olapTable.getPartition(partName, targetPartitionNames.isTemp());
if (part == null) {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_UNKNOWN_PARTITION, partName, targetTable.getName());
}
targetPartitionIds.add(part.getId());
}
}
// need a descriptor
DescriptorTable descTable = analyzer.getDescTbl();
olapTuple = descTable.createTupleDescriptor();
for (Column col : olapTable.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
}
} else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable
|| targetTable instanceof JdbcTable) {
if (targetPartitionNames != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
} else if (targetTable instanceof BrokerTable) {
if (targetPartitionNames != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
BrokerTable brokerTable = (BrokerTable) targetTable;
if (!brokerTable.isWritable()) {
throw new AnalysisException("table " + brokerTable.getName()
+ "is not writable. path should be an dir");
}
} else {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_NON_INSERTABLE_TABLE, targetTable.getName(), targetTable.getType());
}
}
private void checkColumnCoverage(Set<String> mentionedCols, List<Column> baseColumns)
throws AnalysisException {
// check columns of target table
for (Column col : baseColumns) {
if (mentionedCols.contains(col.getName())) {
continue;
}
if (col.getDefaultValue() == null && !col.isAllowNull()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NOT_MENTIONED, col.getName());
}
}
}
private void analyzeSubquery(Analyzer analyzer) throws UserException {
// Analyze columns mentioned in the statement.
Set<String> mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
if (targetColumnNames == null) {
// the mentioned columns are columns which are visible to user, so here we use
// getBaseSchema(), not getFullSchema()
for (Column col : targetTable.getBaseSchema(false)) {
mentionedColumns.add(col.getName());
targetColumns.add(col);
}
} else {
for (String colName : targetColumnNames) {
Column col = targetTable.getColumn(colName);
if (col == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName, targetTable.getName());
}
if (!mentionedColumns.add(colName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_FIELD_SPECIFIED_TWICE, colName);
}
targetColumns.add(col);
}
// hll column mush in mentionedColumns
for (Column col : targetTable.getBaseSchema()) {
if (col.getType().isObjectStored() && !mentionedColumns.contains(col.getName())) {
throw new AnalysisException(" object-stored column " + col.getName()
+ " mush in insert into columns");
}
}
}
/*
* When doing schema change, there may be some shadow columns. we should add
* them to the end of targetColumns. And use 'origColIdxsForExtendCols' to save
* the index of column in 'targetColumns' which the shadow column related to.
* eg: origin targetColumns: (A,B,C), shadow column: __doris_shadow_B after
* processing, targetColumns: (A, B, C, __doris_shadow_B), and
* origColIdxsForExtendCols has 1 element: "1", which is the index of column B
* in targetColumns.
*
* Rule A: If the column which the shadow column related to is not mentioned,
* then do not add the shadow column to targetColumns. They will be filled by
* null or default value when loading.
*
* When table have materialized view, there may be some materialized view columns.
* we should add them to the end of targetColumns.
* eg: origin targetColumns: (A,B,C), shadow column: mv_bitmap_union_C
* after processing, targetColumns: (A, B, C, mv_bitmap_union_C), and
* origColIdx2MVColumn has 1 element: "2, mv_bitmap_union_C"
* will be used in as a mapping from queryStmt.getResultExprs() to targetColumns define expr
*/
List<Pair<Integer, Column>> origColIdxsForExtendCols = Lists.newArrayList();
for (Column column : targetTable.getFullSchema()) {
if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
String origName = Column.removeNamePrefix(column.getName());
for (int i = 0; i < targetColumns.size(); i++) {
if (targetColumns.get(i).nameEquals(origName, false)) {
// Rule A
origColIdxsForExtendCols.add(Pair.of(i, null));
targetColumns.add(column);
break;
}
}
}
if (column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)
|| column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX)) {
List<SlotRef> refColumns = column.getRefColumns();
if (refColumns == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR,
column.getName(), targetTable.getName());
}
for (SlotRef refColumn : refColumns) {
String origName = refColumn.getColumnName();
for (int originColumnIdx = 0; originColumnIdx < targetColumns.size(); originColumnIdx++) {
if (targetColumns.get(originColumnIdx).nameEquals(origName, false)) {
origColIdxsForExtendCols.add(Pair.of(originColumnIdx, column));
targetColumns.add(column);
break;
}
}
}
}
}
// parse query statement
queryStmt.setFromInsert(true);
queryStmt.analyze(analyzer);
// check if size of select item equal with columns mentioned in statement
if (mentionedColumns.size() != queryStmt.getResultExprs().size()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
}
// Check if all columns mentioned is enough
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
List<Column> baseColumns = targetTable.getBaseSchema();
int size = Math.min(baseColumns.size(), queryStmt.getResultExprs().size());
for (int i = 0; i < size; i++) {
slotToIndex.put(baseColumns.get(i).getName(), queryStmt.getResultExprs().get(i));
}
// handle VALUES() or SELECT constant list
if (isValuesOrConstantSelect) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getValueList() != null) {
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex);
}
// clear these 2 structures, rebuild them using VALUES exprs
selectStmt.getResultExprs().clear();
selectStmt.getBaseTblResultExprs().clear();
for (int i = 0; i < selectStmt.getValueList().getFirstRow().size(); ++i) {
selectStmt.getResultExprs().add(selectStmt.getValueList().getFirstRow().get(i));
selectStmt.getBaseTblResultExprs().add(selectStmt.getValueList().getFirstRow().get(i));
}
} else {
// INSERT INTO SELECT 1,2,3 ...
List<ArrayList<Expr>> rows = Lists.newArrayList();
// ATTN: must copy the `selectStmt.getResultExprs()`, otherwise the following
// `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing
// error.
rows.add(Lists.newArrayList(selectStmt.getResultExprs()));
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
for (Expr expr : rows.get(0)) {
selectStmt.getResultExprs().add(expr);
}
}
} else {
// INSERT INTO SELECT ... FROM tbl
if (!origColIdxsForExtendCols.isEmpty()) {
// extend the result expr by duplicating the related exprs
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
}
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0);
queryStmt.getResultExprs().add(e);
}
}
}
// check compatibility
for (int i = 0; i < targetColumns.size(); ++i) {
Column column = targetColumns.get(i);
Expr expr = queryStmt.getResultExprs().get(i);
queryStmt.getResultExprs().set(i, expr.checkTypeCompatibility(column.getType()));
}
}
// expand colLabels in QueryStmt
if (!origColIdxsForExtendCols.isEmpty()) {
if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) {
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
}
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0);
queryStmt.getBaseTblResultExprs().add(e);
}
}
}
if (queryStmt.getResultExprs().size() != queryStmt.getColLabels().size()) {
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
queryStmt.getColLabels().add(queryStmt.getColLabels().get(entry.first));
}
}
}
if (LOG.isDebugEnabled()) {
for (Expr expr : queryStmt.getResultExprs()) {
LOG.debug("final result expr: {}, {}", expr, System.identityHashCode(expr));
}
for (Expr expr : queryStmt.getBaseTblResultExprs()) {
LOG.debug("final base table result expr: {}, {}", expr, System.identityHashCode(expr));
}
for (String colLabel : queryStmt.getColLabels()) {
LOG.debug("final col label: {}", colLabel);
}
}
}
private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows,
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols, Map<String, Expr> slotToIndex)
throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
// so we should minus this
if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) {
throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1));
}
ArrayList<Expr> row = rows.get(rowIdx);
if (!origColIdxsForExtendCols.isEmpty()) {
/**
* we should extend the row for shadow columns.
* eg:
* the origin row has exprs: (expr1, expr2, expr3), and targetColumns is (A, B, C, __doris_shadow_b)
* after processing, extentedRow is (expr1, expr2, expr3, expr2)
*/
ArrayList<Expr> extentedRow = Lists.newArrayList();
extentedRow.addAll(row);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry != null) {
if (entry.second == null) {
extentedRow.add(extentedRow.get(entry.first));
} else {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
}
extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0));
}
}
}
row = extentedRow;
rows.set(rowIdx, row);
}
// check the compatibility of expr in row and column in targetColumns
for (int i = 0; i < row.size(); ++i) {
Expr expr = row.get(i);
Column col = targetColumns.get(i);
if (expr instanceof DefaultValueExpr) {
if (targetColumns.get(i).getDefaultValue() == null) {
throw new AnalysisException("Column has no default value, column="
+ targetColumns.get(i).getName());
}
expr = new StringLiteral(targetColumns.get(i).getDefaultValue());
}
expr.analyze(analyzer);
row.set(i, expr.checkTypeCompatibility(col.getType()));
}
}
private void analyzePlanHints() throws AnalysisException {
if (planHints == null) {
return;
}
for (String hint : planHints) {
if (SHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && !isRepartition) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint);
}
isRepartition = Boolean.TRUE;
} else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && isRepartition) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint);
}
isRepartition = Boolean.FALSE;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PLAN_HINT, hint);
}
}
}
public void prepareExpressions() throws UserException {
List<Expr> selectList = Expr.cloneList(queryStmt.getResultExprs());
// check type compatibility
int numCols = targetColumns.size();
for (int i = 0; i < numCols; ++i) {
Column col = targetColumns.get(i);
Expr expr = selectList.get(i).checkTypeCompatibility(col.getType());
selectList.set(i, expr);
exprByName.put(col.getName(), expr);
}
List<Pair<String, Expr>> resultExprByName = Lists.newArrayList();
// reorder resultExprs in table column order
for (Column col : targetTable.getFullSchema()) {
if (exprByName.containsKey(col.getName())) {
resultExprByName.add(Pair.of(col.getName(), exprByName.get(col.getName())));
} else {
// process sequence col, map sequence column to other column
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()
&& col.getName().equals(Column.SEQUENCE_COL)
&& ((OlapTable) targetTable).getSequenceMapCol() != null) {
if (resultExprByName.stream().map(Pair::key)
.anyMatch(key -> key.equals(((OlapTable) targetTable).getSequenceMapCol()))) {
resultExprByName.add(Pair.of(Column.SEQUENCE_COL,
resultExprByName.stream()
.filter(p -> p.key().equals(((OlapTable) targetTable).getSequenceMapCol()))
.map(Pair::value).findFirst().orElse(null)));
}
} else if (col.getDefaultValue() == null) {
resultExprByName.add(Pair.of(col.getName(), NullLiteral.create(col.getType())));
} else {
if (col.getDefaultValueExprDef() != null) {
resultExprByName.add(Pair.of(col.getName(), col.getDefaultValueExpr()));
} else {
StringLiteral defaultValueExpr;
defaultValueExpr = new StringLiteral(col.getDefaultValue());
resultExprByName.add(Pair.of(col.getName(),
defaultValueExpr.checkTypeCompatibility(col.getType())));
}
}
}
}
resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList()));
}
private DataSink createDataSink() throws AnalysisException {
if (dataSink != null) {
return dataSink;
}
if (targetTable instanceof OlapTable) {
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof BrokerTable) {
BrokerTable table = (BrokerTable) targetTable;
// TODO(lingbin): think use which one if have more than one path
// Map<String, String> brokerProperties = Maps.newHashMap();
// BrokerDesc brokerDesc = new BrokerDesc("test_broker", brokerProperties);
BrokerDesc brokerDesc = new BrokerDesc(table.getBrokerName(), table.getBrokerProperties());
dataSink = new ExportSink(
table.getWritablePath(),
table.getColumnSeparator(),
table.getLineDelimiter(),
brokerDesc);
dataPartition = dataSink.getOutputPartition();
} else {
dataSink = DataSink.createDataSink(targetTable);
dataPartition = DataPartition.UNPARTITIONED;
}
return dataSink;
}
public void complete() throws UserException {
if (!isExplain() && targetTable instanceof OlapTable) {
((OlapTableSink) dataSink).complete();
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), transactionId);
if (txnState == null) {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
}
}
public DataPartition getDataPartition() {
return dataPartition;
}
@Override
public List<? extends DataDesc> getDataDescList() {
throw new UnsupportedOperationException("only invoked for external load currently");
}
@Override
public ResourceDesc getResourceDesc() {
throw new UnsupportedOperationException("only invoked for external load currently");
}
@Override
public LoadType getLoadType() {
return LoadType.NATIVE_INSERT;
}
@Override
public NativeInsertStmt getNativeInsertStmt() {
return this;
}
@Override
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt.rewriteExprs(rewriter);
}
@Override
public void foldConstant(ExprRewriter rewriter, TQueryOptions tQueryOptions) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt.foldConstant(rewriter, tQueryOptions);
}
@Override
public List<Expr> getResultExprs() {
return resultExprs;
}
@Override
public void reset() {
super.reset();
if (targetPartitionIds != null) {
targetPartitionIds.clear();
}
queryStmt.reset();
resultExprs.clear();
exprByName.clear();
dataSink = null;
dataPartition = null;
targetColumns.clear();
}
@Override
public RedirectStatus getRedirectStatus() {
if (isExplain()) {
return RedirectStatus.NO_FORWARD;
} else {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}
@Override
public String toSql() {
return null;
}
}

View File

@ -40,10 +40,15 @@ import java.util.Map;
public class ResourceDesc {
protected String name;
protected Map<String, String> properties;
/**
* TODO(tsy): transfer to LoadType
*/
protected EtlJobType etlJobType;
protected LoadType loadType;
// Only used for recovery
private ResourceDesc() {
public ResourceDesc() {
}
public ResourceDesc(String name, Map<String, String> properties) {

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java
// and modified by Doris
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class SparkLoadStmt extends InsertStmt {
private final DataDescription dataDescription;
private final ResourceDesc resourceDesc;
public SparkLoadStmt(LabelName label, List<DataDescription> dataDescList, ResourceDesc resourceDesc,
Map<String, String> properties, String comments) {
this.label = label;
Preconditions.checkState(dataDescList.size() == 1,
"spark load could only have one desc");
this.dataDescription = dataDescList.get(0);
this.resourceDesc = resourceDesc;
this.properties = properties;
this.comments = comments;
}
@Override
public List<? extends DataDesc> getDataDescList() {
return Collections.singletonList(dataDescription);
}
@Override
public ResourceDesc getResourceDesc() {
return resourceDesc;
}
@Override
public LoadType getLoadType() {
return LoadType.SPARK_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
label.analyze(analyzer);
Preconditions.checkNotNull(dataDescription, new AnalysisException("No data file in load statement."));
Preconditions.checkNotNull(resourceDesc, new AnalysisException("Resource desc not found"));
String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
resourceDesc.analyze();
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
// check resource usage privilege
if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(),
resourceDesc.getName(),
PrivPredicate.USAGE)) {
throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ "'@'" + ConnectContext.get().getRemoteIP()
+ "' for resource '" + resourceDesc.getName() + "'");
}
}
@Override
public String toSql() {
return super.toSql();
}
}

View File

@ -29,12 +29,12 @@ import java.util.Map;
* |
* The broker's StorageBackend.StorageType desc
*/
public class StorageDesc {
protected String name;
protected StorageBackend.StorageType storageType;
protected Map<String, String> properties;
public class StorageDesc extends ResourceDesc {
public StorageDesc() {}
protected StorageBackend.StorageType storageType;
public StorageDesc() {
}
public StorageDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
this.name = name;

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
/**
* Used for load refactor, as an adapter for original load stmt, will proxy to insert stmt or original load stmt, chosen
* by configuration
*/
public class UnifiedLoadStmt extends DdlStmt {
private final StatementBase proxyStmt;
public UnifiedLoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map<String, String> properties, String comment, LoadType loadType) {
final ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) {
switch (loadType) {
case BROKER_LOAD:
proxyStmt = new BrokerLoadStmt(label, dataDescriptions, brokerDesc, properties, comment);
break;
case MYSQL_LOAD:
default:
throw new IllegalStateException("does not support load type: " + loadType);
}
} else {
proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, cluster, properties, comment);
}
}
public void init() {
Preconditions.checkNotNull(proxyStmt, "impossible state, proxy stmt should be not null");
proxyStmt.setOrigStmt(getOrigStmt());
proxyStmt.setUserInfo(getUserInfo());
}
public StatementBase getProxyStmt() {
return proxyStmt;
}
}

View File

@ -119,7 +119,7 @@ public class UpdateStmt extends DdlStmt {
LimitElement.NO_LIMIT
);
insertStmt = new InsertStmt(
insertStmt = new NativeInsertStmt(
new InsertTarget(tableName, null),
null,
cols,

View File

@ -150,6 +150,7 @@ import org.apache.doris.load.loadv2.LoadEtlChecker;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadLoadingChecker;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadManagerAdapter;
import org.apache.doris.load.loadv2.ProgressManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadScheduler;
@ -439,6 +440,11 @@ public class Env {
private StatisticsCleaner statisticsCleaner;
/**
* TODO(tsy): to be removed after load refactor
*/
private final LoadManagerAdapter loadManagerAdapter;
private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
@ -645,6 +651,7 @@ public class Env {
}
this.globalFunctionMgr = new GlobalFunctionMgr();
this.resourceGroupMgr = new ResourceGroupMgr();
this.loadManagerAdapter = new LoadManagerAdapter();
}
public static void destroyCheckpoint() {
@ -1038,7 +1045,8 @@ public class Env {
clusterId = storage.getClusterID();
token = storage.getToken();
try {
URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(), Config.http_port) + "/check");
URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(),
Config.http_port) + "/check");
HttpURLConnection conn = null;
conn = (HttpURLConnection) idURL.openConnection();
conn.setConnectTimeout(2 * 1000);
@ -1110,7 +1118,8 @@ public class Env {
try {
// For upgrade compatibility, the host parameter name remains the same
// and the new hostname parameter is added
URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port)
URL url = new URL(
"http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port)
+ "/role?host=" + selfNode.getIp() + "&hostname=" + selfNode.getHostName()
+ "&port=" + selfNode.getPort());
HttpURLConnection conn = null;
@ -5261,6 +5270,10 @@ public class Env {
return statisticsCleaner;
}
public LoadManagerAdapter getLoadManagerAdapter() {
return loadManagerAdapter;
}
public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
return statisticsAutoAnalyzer;
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@ -371,4 +372,37 @@ public abstract class BulkLoadJob extends LoadJob {
}
return null;
}
// ---------------- for load stmt ----------------
public static BulkLoadJob fromInsertStmt(InsertStmt insertStmt) throws DdlException {
// get db id
String dbName = insertStmt.getLoadLabel().getDbName();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// create job
BulkLoadJob bulkLoadJob;
try {
switch (insertStmt.getLoadType()) {
case BROKER_LOAD:
bulkLoadJob = new BrokerLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(),
(BrokerDesc) insertStmt.getResourceDesc(),
insertStmt.getOrigStmt(), insertStmt.getUserInfo());
break;
case SPARK_LOAD:
bulkLoadJob = new SparkLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(),
insertStmt.getResourceDesc(),
insertStmt.getOrigStmt(), insertStmt.getUserInfo());
break;
default:
throw new DdlException("Unknown load job type.");
}
bulkLoadJob.setComment(insertStmt.getComments());
bulkLoadJob.setJobProperties(insertStmt.getProperties());
// TODO(tsy): use generic and change the param in checkAndSetDataSourceInfo
bulkLoadJob.checkAndSetDataSourceInfo(db, (List<DataDescription>) insertStmt.getDataDescList());
return bulkLoadJob;
} catch (MetaNotFoundException e) {
throw new DdlException(e.getMessage());
}
}
}

View File

@ -17,9 +17,11 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
@ -109,7 +111,7 @@ public class LoadManager implements Writable {
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob = null;
LoadJob loadJob;
writeLock();
try {
if (stmt.getBrokerDesc() != null && stmt.getBrokerDesc().isMultiLoadBroker()) {
@ -877,4 +879,43 @@ public class LoadManager implements Writable {
}
}
}
// ------------------------ for load refactor ------------------------
public long createLoadJobFromStmt(InsertStmt insertStmt) throws DdlException {
Database database = checkDb(insertStmt.getLoadLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob;
writeLock();
BrokerDesc brokerDesc = (BrokerDesc) insertStmt.getResourceDesc();
try {
if (brokerDesc != null && brokerDesc.isMultiLoadBroker()) {
if (!Env.getCurrentEnv().getLoadInstance()
.isUncommittedLabel(dbId, insertStmt.getLoadLabel().getLabelName())) {
throw new DdlException("label: " + insertStmt.getLoadLabel().getLabelName() + " not found!");
}
} else {
checkLabelUsed(dbId, insertStmt.getLoadLabel().getLabelName());
if (brokerDesc == null && insertStmt.getResourceDesc() == null) {
throw new DdlException("LoadManager only support the broker and spark load.");
}
if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
throw new DdlException(
"There are more than " + Config.desired_max_waiting_jobs
+ " unfinished load jobs, please retry later. "
+ "You can use `SHOW LOAD` to view submitted jobs");
}
}
loadJob = BulkLoadJob.fromInsertStmt(insertStmt);
createLoadJob(loadJob);
} finally {
writeUnlock();
}
Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
// The job must be submitted after edit log.
// It guarantee that load job has not been changed before edit log.
loadJobScheduler.submitJob(loadJob);
return loadJob.getId();
}
}

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.qe.StmtExecutor;
/**
* This class is temporary for load refactor, all unified external load should use this adapter in
* {@link StmtExecutor#handleExternalInsertStmt()}
* <p>
* TODO(tsy): removed after job-manager system for loads is unified
*/
public class LoadManagerAdapter {
public void startLoadFromInsertStmt(InsertStmt insertStmt) throws DdlException {
final LoadType loadType = insertStmt.getLoadType();
final LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
switch (loadType) {
case BROKER_LOAD:
loadManager.createLoadJobFromStmt(insertStmt);
break;
case MYSQL_LOAD:
// TODO: implement
break;
case ROUTINE_LOAD:
// TODO: implement
break;
case STREAM_LOAD:
// TODO: implement
break;
default:
throw new DdlException("unsupported load type:" + loadType);
}
}
}

View File

@ -245,7 +245,7 @@ public class OriginalPlanner extends Planner {
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
rootFragment.setSink(insertStmt.getDataSink());
insertStmt.complete();
ArrayList<Expr> exprs = ((InsertStmt) statement).getResultExprs();
List<Expr> exprs = statement.getResultExprs();
List<Expr> resExprs = Expr.substituteList(
exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
rootFragment.setOutputExprs(resExprs);

View File

@ -313,6 +313,11 @@ public class SessionVariable implements Serializable, Writable {
// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
public static final String FILE_SPLIT_SIZE = "file_split_size";
/**
* use insert stmt as the unified backend for all loads
*/
public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load";
public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization";
public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization";
@ -800,7 +805,7 @@ public class SessionVariable implements Serializable, Writable {
public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 8;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
checker = "checkExternalAggPartitionBits", fuzzy = true)
checker = "checkExternalAggPartitionBits", fuzzy = true)
public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks.
// Whether enable two phase read optimization
@ -856,6 +861,12 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
public long fileSplitSize = 0;
/**
* determine should we enable unified load (use insert stmt as the backend for all load)
*/
@VariableMgr.VarAttr(name = ENABLE_UNIFIED_LOAD, needForward = true)
public boolean enableUnifiedLoad = false;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
@ -2086,4 +2097,8 @@ public class SessionVariable implements Serializable, Writable {
}
return num;
}
public boolean isEnableUnifiedLoad() {
return enableUnifiedLoad;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.LoadType;
import org.apache.doris.analysis.LockTablesStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.OutFileClause;
@ -58,6 +59,7 @@ import org.apache.doris.analysis.TransactionBeginStmt;
import org.apache.doris.analysis.TransactionCommitStmt;
import org.apache.doris.analysis.TransactionRollbackStmt;
import org.apache.doris.analysis.TransactionStmt;
import org.apache.doris.analysis.UnifiedLoadStmt;
import org.apache.doris.analysis.UnlockTablesStmt;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UpdateStmt;
@ -94,6 +96,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadManagerAdapter;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
@ -572,6 +575,18 @@ public class StmtExecutor {
if (parsedStmt instanceof QueryStmt) {
context.getState().setIsQuery(true);
}
if (parsedStmt instanceof UnifiedLoadStmt) {
// glue code for unified load
final UnifiedLoadStmt unifiedLoadStmt = (UnifiedLoadStmt) parsedStmt;
unifiedLoadStmt.init();
final StatementBase proxyStmt = unifiedLoadStmt.getProxyStmt();
parsedStmt = proxyStmt;
if (!(proxyStmt instanceof LoadStmt)) {
Preconditions.checkState(
parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).needLoadManager(),
new IllegalStateException("enable_unified_load=true, should be external insert stmt"));
}
}
try {
if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt)
@ -584,7 +599,7 @@ public class StmtExecutor {
if (!context.isTxnModel()) {
Span queryAnalysisSpan =
context.getTracer().spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
try (Scope ignored = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze(context.getSessionVariable().toThrift());
} catch (Exception e) {
@ -639,15 +654,22 @@ public class StmtExecutor {
} else if (parsedStmt instanceof CreateTableAsSelectStmt) {
handleCtasStmt();
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass
try {
if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) {
profileType = ProfileType.LOAD;
InsertStmt insertStmt = (InsertStmt) parsedStmt;
if (insertStmt.needLoadManager()) {
// TODO(tsy): will eventually try to handle native insert and external insert together
// add a branch for external load
handleExternalInsertStmt();
} else {
try {
if (!insertStmt.getQueryStmt().isExplain()) {
profileType = ProfileType.LOAD;
}
handleInsertStmt();
} catch (Throwable t) {
LOG.warn("handle insert stmt fail: {}", t.getMessage());
// the transaction of this insert may already begin, we will abort it at outer finally block.
throw t;
}
handleInsertStmt();
} catch (Throwable t) {
LOG.warn("handle insert stmt fail: {}", t.getMessage());
// the transaction of this insert may already begin, we will abort it at outer finally block.
throw t;
}
} else if (parsedStmt instanceof LoadStmt) {
handleLoadStmt();
@ -698,7 +720,8 @@ public class StmtExecutor {
InsertStmt insertStmt = (InsertStmt) parsedStmt;
// The transaction of an insert operation begin at analyze phase.
// So we should abort the transaction at this finally block if it encounters exception.
if (insertStmt.isTransactionBegin() && context.getState().getStateType() == MysqlStateType.ERR) {
if (!insertStmt.needLoadManager() && insertStmt.isTransactionBegin()
&& context.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(context.getState().getErrorMessage());
Env.getCurrentGlobalTransactionMgr().abortTransaction(
@ -827,7 +850,7 @@ public class StmtExecutor {
}
if (parsedStmt instanceof QueryStmt
|| parsedStmt instanceof InsertStmt
|| (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager())
|| parsedStmt instanceof CreateTableAsSelectStmt) {
if (Config.enable_resource_group && context.sessionVariable.enablePipelineEngine()) {
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
@ -1483,9 +1506,10 @@ public class StmtExecutor {
private int executeForTxn(InsertStmt insertStmt)
throws UserException, TException, InterruptedException, ExecutionException, TimeoutException {
if (context.isTxnIniting()) { // first time, begin txn
beginTxn(insertStmt.getDb(), insertStmt.getTbl());
beginTxn(insertStmt.getDbName(),
insertStmt.getTbl());
}
if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDb())
if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDbName())
|| !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) {
throw new TException("Only one table can be inserted in one transaction.");
}
@ -1585,8 +1609,8 @@ public class StmtExecutor {
if (context.getMysqlChannel() != null) {
context.getMysqlChannel().reset();
}
// create plan
InsertStmt insertStmt = (InsertStmt) parsedStmt;
// create plan
if (insertStmt.getQueryStmt().hasOutFileClause()) {
throw new DdlException("Not support OUTFILE clause in INSERT statement");
}
@ -1728,7 +1752,8 @@ public class StmtExecutor {
txnId = insertStmt.getTransactionId();
try {
context.getEnv().getLoadManager()
.recordFinishedLoadJob(label, txnId, insertStmt.getDb(), insertStmt.getTargetTable().getId(),
.recordFinishedLoadJob(label, txnId, insertStmt.getDbName(),
insertStmt.getTargetTable().getId(),
EtlJobType.INSERT, createTime, throwable == null ? "" : throwable.getMessage(),
coord.getTrackingUrl(), insertStmt.getUserInfo());
} catch (MetaNotFoundException e) {
@ -1754,12 +1779,34 @@ public class StmtExecutor {
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
context.setOrUpdateInsertResult(txnId, label, insertStmt.getDb(), insertStmt.getTbl(),
context.setOrUpdateInsertResult(txnId, label, insertStmt.getDbName(), insertStmt.getTbl(),
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
context.updateReturnRows((int) loadedRows);
}
private void handleExternalInsertStmt() {
// TODO(tsy): load refactor, handle external load here
try {
InsertStmt insertStmt = (InsertStmt) parsedStmt;
LoadType loadType = insertStmt.getLoadType();
if (loadType == LoadType.UNKNOWN) {
throw new DdlException("Unknown load job type");
}
LoadManagerAdapter loadManagerAdapter = context.getEnv().getLoadManagerAdapter();
loadManagerAdapter.startLoadFromInsertStmt(insertStmt);
context.getState().setOk();
} catch (UserException e) {
// Return message to info client what happened.
LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e);
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
} catch (Exception e) {
// Maybe our bug
LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
}
}
private void handleUnsupportedStmt() {
context.getMysqlChannel().reset();
// do nothing