diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ea9fd9c3f6..6d0fc113f2 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -33,6 +33,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt.QuotaType; import org.apache.doris.analysis.SetOperationStmt.Qualifier; import org.apache.doris.analysis.SetOperationStmt.Operation; import org.apache.doris.analysis.SetOperationStmt.SetOperand; +import org.apache.doris.analysis.LoadType; import org.apache.doris.catalog.AccessPrivilege; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.KeysType; @@ -2259,7 +2260,7 @@ load_stmt ::= opt_properties:properties opt_comment:comment {: - RESULT = new LoadStmt(label, dataDescList, broker, system, properties, comment); + RESULT = new UnifiedLoadStmt(label, dataDescList, broker, system, properties, comment, LoadType.BROKER_LOAD); :} | KW_LOAD KW_LABEL job_label:label LPAREN data_desc_list:dataDescList RPAREN @@ -4479,7 +4480,7 @@ use_stmt ::= insert_stmt ::= KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source {: - RESULT = new InsertStmt(target, label, cols, source, hints); + RESULT = new NativeInsertStmt(target, label, cols, source, hints); :} // TODO(zc) add default value for SQL-2003 // | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES @@ -4510,6 +4511,7 @@ insert_source ::= :} ; + // update stmt update_stmt ::= KW_UPDATE table_name:tbl opt_table_alias:alias set_clause:setClause opt_from_clause:fromClause where_clause:whereClause diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java index 8df6f1ddc0..7b8f9c77be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -101,8 +101,7 @@ public class BrokerDesc extends StorageDesc implements Writable { } public static BrokerDesc createForStreamLoad() { - BrokerDesc brokerDesc = new BrokerDesc("", StorageType.STREAM, null); - return brokerDesc; + return new BrokerDesc("", StorageType.STREAM, null); } public boolean isMultiLoadBroker() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerLoadStmt.java new file mode 100644 index 0000000000..26632e2ac6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerLoadStmt.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; + +import java.util.List; +import java.util.Map; + +public class BrokerLoadStmt extends InsertStmt { + + private final List dataDescList; + + private final BrokerDesc brokerDesc; + + private String cluster; + + public BrokerLoadStmt(LabelName label, List dataDescList, BrokerDesc brokerDesc, + Map properties, String comments) { + this.label = label; + this.dataDescList = dataDescList; + this.brokerDesc = brokerDesc; + this.properties = properties; + if (comments != null) { + this.comments = comments; + } else { + this.comments = ""; + } + } + + @Override + public List getDataDescList() { + return dataDescList; + } + + @Override + public BrokerDesc getResourceDesc() { + return brokerDesc; + } + + @Override + public LoadType getLoadType() { + return LoadType.BROKER_LOAD; + } + + @Override + public void analyzeProperties() throws DdlException { + // public check should be in base class + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + label.analyze(analyzer); + Preconditions.checkState(!CollectionUtils.isEmpty(dataDescList), + new AnalysisException("No data file in load statement.")); + Preconditions.checkNotNull(brokerDesc, "No broker desc found."); + // check data descriptions + for (DataDescription dataDescription : dataDescList) { + final String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer); + dataDescription.analyze(fullDbName); + Preconditions.checkState(!dataDescription.isLoadFromTable(), + new AnalysisException("Load from table should use Spark Load")); + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName); + OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName()); + dataDescription.checkKeyTypeForLoad(table); + if (!brokerDesc.isMultiLoadBroker()) { + for (int i = 0; i < dataDescription.getFilePaths().size(); i++) { + String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i)); + dataDescription.getFilePaths().set(i, location); + dataDescription.getFilePaths().set(i, + ExportStmt.checkPath(dataDescription.getFilePaths().get(i), brokerDesc.getStorageType())); + } + } + } + } + + @Override + public boolean needAuditEncryption() { + return true; + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("LOAD LABEL ").append(label.toSql()).append("\n"); + sb.append("("); + Joiner.on(",\n").appendTo(sb, Lists.transform(dataDescList, DataDesc::toSql)).append(")"); + if (cluster != null) { + sb.append("\nBY '"); + sb.append(cluster); + sb.append("'"); + } + if (brokerDesc != null) { + sb.append("\n").append(brokerDesc.toSql()); + } + + if (properties != null && !properties.isEmpty()) { + sb.append("\nPROPERTIES ("); + sb.append(new PrintableMap<>(properties, "=", true, false)); + sb.append(")"); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java index 45bdb4bdbb..88dc3735fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java @@ -51,7 +51,7 @@ public class CreateTableAsSelectStmt extends DdlStmt { this.createTableStmt = createTableStmt; this.columnNames = columnNames; this.queryStmt = queryStmt; - this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt); + this.insertStmt = new NativeInsertStmt(createTableStmt.getDbTbl(), queryStmt); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 80a1c33d07..fadb889921 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -81,7 +82,7 @@ import java.util.TreeSet; * The transform after the keyword named SET is the old ways which only supports the hadoop function. * It old way of transform will be removed gradually. It */ -public class DataDescription { +public class DataDescription implements InsertStmt.DataDesc { private static final Logger LOG = LogManager.getLogger(DataDescription.class); // function isn't built-in function, hll_hash is not built-in function in hadoop load. private static final List HADOOP_SUPPORT_FUNCTION_NAMES = Arrays.asList( @@ -146,7 +147,7 @@ public class DataDescription { private boolean isHadoopLoad = false; - private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; + private final LoadTask.MergeType mergeType; private final Expr deleteCondition; private final Map properties; private boolean trimDoubleQuotes = false; @@ -1153,6 +1154,17 @@ public class DataDescription { return sb.toString(); } + public void checkKeyTypeForLoad(OlapTable table) throws AnalysisException { + if (getMergeType() != LoadTask.MergeType.APPEND) { + if (table.getKeysType() != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } else if (!table.hasDeleteSign()) { + throw new AnalysisException( + "load by MERGE or DELETE need to upgrade table to support batch delete."); + } + } + } + @Override public String toString() { return toSql(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java index 8dadd20058..67aded7e08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java @@ -167,7 +167,7 @@ public class DeleteStmt extends DdlStmt { LimitElement.NO_LIMIT ); - insertStmt = new InsertStmt( + insertStmt = new NativeInsertStmt( new InsertTarget(tableName, null), null, cols, diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 4ca2435fb2..c71bb822fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -17,808 +17,313 @@ package org.apache.doris.analysis; -import org.apache.doris.alter.SchemaChangeHandler; -import org.apache.doris.catalog.BrokerTable; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.JdbcTable; -import org.apache.doris.catalog.MysqlTable; -import org.apache.doris.catalog.OdbcTable; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.external.JdbcExternalDatabase; -import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.JdbcExternalCatalog; -import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.ExportSink; -import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.rewrite.ExprRewriter; -import org.apache.doris.service.FrontendOptions; -import org.apache.doris.thrift.TQueryOptions; -import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnCoordinator; -import org.apache.doris.transaction.TransactionState.TxnSourceType; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import org.apache.commons.collections.MapUtils; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.stream.Collectors; /** - * Insert into is performed to load data from the result of query stmt. - * - * syntax: - * INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt - * - * table_name: is the name of target table - * partition_info: PARTITION (p1,p2) - * the partition info of target table - * col_list: (c1,c2) - * the column list of target table - * plan_hints: [STREAMING,SHUFFLE_HINT] - * The streaming plan is used by both streaming and non-streaming insert stmt. - * The only difference is that non-streaming will record the load info in LoadManager and return label. - * User can check the load info by show load stmt. + * This is the unified abstract stmt for all load kinds of load in {@link LoadType} + * All contents of native InsertStmt is moved to {@link NativeInsertStmt} + * Currently this abstract class keep the native insert methods for compatibility, and will eventually be moved + * to {@link NativeInsertStmt} */ -public class InsertStmt extends DdlStmt { - private static final Logger LOG = LogManager.getLogger(InsertStmt.class); +public abstract class InsertStmt extends DdlStmt { - public static final String SHUFFLE_HINT = "SHUFFLE"; - public static final String NOSHUFFLE_HINT = "NOSHUFFLE"; - public static final String STREAMING = "STREAMING"; + public static class Properties { - private final TableName tblName; - private final PartitionNames targetPartitionNames; - // parsed from targetPartitionNames. - private List targetPartitionIds; - private final List targetColumnNames; - private QueryStmt queryStmt; - private final List planHints; - private Boolean isRepartition; - private boolean isStreaming = false; - private String label = null; + public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; - private Map indexIdToSchemaHash = null; + public static final String TIMEZONE = "timezone"; - // set after parse all columns and expr in query statement - // this result expr in the order of target table's columns - private ArrayList resultExprs = Lists.newArrayList(); + public static final String STRICT_MODE = "strict_mode"; - private Map exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + public static final String MAX_FILTER_RATIO = "max_filter_ratio"; - private Table targetTable; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; - private DatabaseIf db; - private long transactionId; + public static final String TIMEOUT_PROPERTY = "timeout"; - // we need a new TupleDesc for olap table. - private TupleDescriptor olapTuple; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; - private DataSink dataSink; - private DataPartition dataPartition; + public static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes"; - private List targetColumns = Lists.newArrayList(); + public static final String SKIP_LINES = "skip_lines"; - /* - * InsertStmt may be analyzed twice, but transaction must be only begun once. - * So use a boolean to check if transaction already begun. + public static final String LINE_DELIMITER = "line_delimiter"; + + public static final String COLUMN_SEPARATOR = "column_separator"; + + // ------------------------ just for routine load ------------------------ + + public static final String MAX_BATCH_SIZE = "max_batch_size"; + + public static final String MAX_BATCH_ROWS = "max_batch_rows"; + + public static final String MAX_BATCH_INTERVAL = "max_batch_interval"; + + // ----------------------------------------------------------------------- + + // TODO: to be discovered in developing + + } + + protected LabelName label; + + protected Map properties; + + protected List targetTables; + + protected String comments; + + /** + * TODO: change Function to Util.getXXXPropertyOrDefault() */ - private boolean isTransactionBegin = false; + public static final ImmutableMap> PROPERTIES_MAP = + new Builder>() + .put(Properties.EXEC_MEM_LIMIT, (Function) Long::valueOf) + .put(Properties.TIMEOUT_PROPERTY, (Function) Long::valueOf) + .put(Properties.TIMEZONE, (Function) input -> input) + .put(Properties.LOAD_TO_SINGLE_TABLET, + (Function) Boolean::parseBoolean) + .put(Properties.MAX_FILTER_RATIO, (Function) Double::parseDouble) + .put(Properties.SEND_BATCH_PARALLELISM, + (Function) Integer::parseInt) + .put(Properties.STRICT_MODE, (Function) Boolean::parseBoolean) + .put(Properties.TRIM_DOUBLE_QUOTES, (Function) Boolean::parseBoolean) + .put(Properties.SKIP_LINES, (Function) Integer::valueOf) + .put(Properties.LINE_DELIMITER, (Function) String::valueOf) + .put(Properties.COLUMN_SEPARATOR, (Function) String::valueOf) + .put(Properties.MAX_BATCH_SIZE, (Function) Long::parseLong) + .put(Properties.MAX_BATCH_ROWS, (Function) Integer::valueOf) + .put(Properties.MAX_BATCH_INTERVAL, (Function) Long::valueOf) + .build(); - private boolean isValuesOrConstantSelect = false; + // ---------------------------- for old insert stmt ---------------------------- public boolean isValuesOrConstantSelect() { - return isValuesOrConstantSelect; - } - - public InsertStmt(InsertTarget target, String label, List cols, InsertSource source, List hints) { - this.tblName = target.getTblName(); - this.targetPartitionNames = target.getPartitionNames(); - this.label = label; - this.queryStmt = source.getQueryStmt(); - this.planHints = hints; - this.targetColumnNames = cols; - - this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt - && ((SelectStmt) queryStmt).getTableRefs().isEmpty()); - } - - // Ctor for CreateTableAsSelectStmt - public InsertStmt(TableName name, QueryStmt queryStmt) { - this.tblName = name; - this.targetPartitionNames = null; - this.targetColumnNames = null; - this.queryStmt = queryStmt; - this.planHints = null; - } - - public TupleDescriptor getOlapTuple() { - return olapTuple; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public Table getTargetTable() { - return targetTable; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public void setTargetTable(Table targetTable) { - this.targetTable = targetTable; - } - - public Map getIndexIdToSchemaHash() { - return this.indexIdToSchemaHash; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public long getTransactionId() { - return this.transactionId; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public Boolean isRepartition() { - return isRepartition; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } - public String getDb() { - return tblName.getDb(); + public String getDbName() { + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public String getTbl() { - return tblName.getTbl(); + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public void getTables(Analyzer analyzer, Map tableMap, Set parentViewNameSet) throws AnalysisException { - // get dbs of statement - queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet); - tblName.analyze(analyzer); - // disallow external catalog except JdbcExternalCatalog - if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog - && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { - Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); - } - String dbName = tblName.getDb(); - String tableName = tblName.getTbl(); - // check exist - DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName); - TableIf table = db.getTableOrAnalysisException(tblName.getTbl()); - - // check access - if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), dbName, tableName, PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - dbName + ": " + tableName); - } - - tableMap.put(table.getId(), table); + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public QueryStmt getQueryStmt() { - return queryStmt; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public void setQueryStmt(QueryStmt queryStmt) { - this.queryStmt = queryStmt; - } - - @Override - public void foldConstant(ExprRewriter rewriter, TQueryOptions tQueryOptions) throws AnalysisException { - Preconditions.checkState(isAnalyzed()); - queryStmt.foldConstant(rewriter, tQueryOptions); - } - - @Override - public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException { - Preconditions.checkState(isAnalyzed()); - queryStmt.rewriteExprs(rewriter); - } - - @Override - public boolean isExplain() { - return queryStmt.isExplain(); - } - - public boolean isStreaming() { - return isStreaming; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public String getLabel() { - return label; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public DataSink getDataSink() { - return dataSink; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public DatabaseIf getDbObj() { - return db; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } public boolean isTransactionBegin() { - return isTransactionBegin; + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); + } + + public void prepareExpressions() throws UserException { + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); + } + + public void complete() throws UserException { + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); + } + + public DataPartition getDataPartition() { + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); + } + + // --------------------------------------------------------------------------- + + // ------------------------- for unified insert stmt ------------------------- + + public boolean needLoadManager() { + return getLoadType() != LoadType.NATIVE_INSERT && getLoadType() != LoadType.UNKNOWN; + } + + public LabelName getLoadLabel() { + return label; + } + + /** + * for multi-tables load, we need have several target tbl + * + * @return all target table names + */ + public List
getTargetTableList() { + return targetTables; + } + + public abstract List getDataDescList(); + + public abstract ResourceDesc getResourceDesc(); + + public abstract LoadType getLoadType(); + + public Map getProperties() { + return properties; + } + + public String getComments() { + return comments; } @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); + analyzeProperties(); + } - if (targetTable == null) { - tblName.analyze(analyzer); - // disallow external catalog except JdbcExternalCatalog - if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog - && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { - Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); - } - } + protected void analyzeProperties() throws DdlException { + checkProperties(); + } - // Check privilege - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tblName.getDb(), - tblName.getTbl(), PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), tblName.getDb() + ": " + tblName.getTbl()); - } - - // check partition - if (targetPartitionNames != null) { - targetPartitionNames.analyze(analyzer); - } - - // set target table and - analyzeTargetTable(analyzer); - - analyzeSubquery(analyzer); - - analyzePlanHints(analyzer); - - if (analyzer.getContext().isTxnModel()) { + /** + * TODO(tsy): find a shorter way to check props + */ + private void checkProperties() throws DdlException { + if (MapUtils.isEmpty(properties)) { return; } - // create data sink - createDataSink(); - - db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); - // create label and begin transaction - long timeoutSecond = ConnectContext.get().getExecTimeout(); - if (Strings.isNullOrEmpty(label)) { - label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"); - } - if (!isExplain() && !isTransactionBegin) { - if (targetTable instanceof OlapTable) { - LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; - transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), - Lists.newArrayList(targetTable.getId()), label, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - sourceType, timeoutSecond); - } - isTransactionBegin = true; - } - - // init data sink - if (!isExplain() && targetTable instanceof OlapTable) { - OlapTableSink sink = (OlapTableSink) dataSink; - TUniqueId loadId = analyzer.getContext().queryId(); - int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism(); - sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism, false); - } - } - - private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { - // Get table - if (targetTable == null) { - DatabaseIf db = analyzer.getEnv().getCatalogMgr() - .getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); - if (db instanceof Database) { - targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl()); - } else if (db instanceof JdbcExternalDatabase) { - JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl()); - targetTable = jdbcTable.getJdbcTable(); - } else { - throw new AnalysisException("Not support insert target table."); + for (Entry entry : properties.entrySet()) { + if (!InsertStmt.PROPERTIES_MAP.containsKey(entry.getKey())) { + throw new DdlException(entry.getKey() + " is invalid property"); } } - if (targetTable instanceof OlapTable) { - OlapTable olapTable = (OlapTable) targetTable; - - // partition - if (targetPartitionNames != null) { - targetPartitionIds = Lists.newArrayList(); - if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); - } - for (String partName : targetPartitionNames.getPartitionNames()) { - Partition part = olapTable.getPartition(partName, targetPartitionNames.isTemp()); - if (part == null) { - ErrorReport.reportAnalysisException( - ErrorCode.ERR_UNKNOWN_PARTITION, partName, targetTable.getName()); - } - targetPartitionIds.add(part.getId()); + // exec mem + final String execMemProperty = properties.get(InsertStmt.Properties.EXEC_MEM_LIMIT); + if (execMemProperty != null) { + try { + final long execMem = Long.parseLong(execMemProperty); + if (execMem <= 0) { + throw new DdlException(InsertStmt.Properties.EXEC_MEM_LIMIT + " must be greater than 0"); } + } catch (NumberFormatException e) { + throw new DdlException(InsertStmt.Properties.EXEC_MEM_LIMIT + " is not a number."); } - // need a descriptor - DescriptorTable descTable = analyzer.getDescTbl(); - olapTuple = descTable.createTupleDescriptor(); - for (Column col : olapTable.getFullSchema()) { - SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple); - slotDesc.setIsMaterialized(true); - slotDesc.setType(col.getType()); - slotDesc.setColumn(col); - slotDesc.setIsNullable(col.isAllowNull()); - } - // will use it during create load job - indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash(); - } else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable - || targetTable instanceof JdbcTable) { - if (targetPartitionNames != null) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); - } - } else if (targetTable instanceof BrokerTable) { - if (targetPartitionNames != null) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); - } - - BrokerTable brokerTable = (BrokerTable) targetTable; - if (!brokerTable.isWritable()) { - throw new AnalysisException("table " + brokerTable.getName() - + "is not writable. path should be an dir"); - } - - } else { - ErrorReport.reportAnalysisException( - ErrorCode.ERR_NON_INSERTABLE_TABLE, targetTable.getName(), targetTable.getType()); } - } - private void checkColumnCoverage(Set mentionedCols, List baseColumns) - throws AnalysisException { - - // check columns of target table - for (Column col : baseColumns) { - if (mentionedCols.contains(col.getName())) { - continue; + // timeout + final String timeoutLimitProperty = properties.get(InsertStmt.Properties.TIMEOUT_PROPERTY); + if (timeoutLimitProperty != null) { + try { + final int timeoutLimit = Integer.parseInt(timeoutLimitProperty); + if (timeoutLimit < 0) { + throw new DdlException(InsertStmt.Properties.TIMEOUT_PROPERTY + " must be greater than 0"); + } + } catch (NumberFormatException e) { + throw new DdlException(InsertStmt.Properties.TIMEOUT_PROPERTY + " is not a number."); } - if (col.getDefaultValue() == null && !col.isAllowNull()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NOT_MENTIONED, col.getName()); + } + + // max filter ratio + final String maxFilterRadioProperty = properties.get(Properties.MAX_FILTER_RATIO); + if (maxFilterRadioProperty != null) { + try { + double maxFilterRatio = Double.parseDouble(maxFilterRadioProperty); + if (maxFilterRatio < 0.0 || maxFilterRatio > 1.0) { + throw new DdlException(Properties.MAX_FILTER_RATIO + " must between 0.0 and 1.0."); + } + } catch (NumberFormatException e) { + throw new DdlException(Properties.MAX_FILTER_RATIO + " is not a number."); + } + } + + // strict mode + final String strictModeProperty = properties.get(Properties.STRICT_MODE); + if (strictModeProperty != null) { + if (!strictModeProperty.equalsIgnoreCase("true") + && !strictModeProperty.equalsIgnoreCase("false")) { + throw new DdlException(Properties.STRICT_MODE + " is not a boolean"); + } + } + + // time zone + final String timezone = properties.get(Properties.TIMEZONE); + if (timezone != null) { + properties.put(Properties.TIMEZONE, TimeUtils.checkTimeZoneValidAndStandardize( + properties.getOrDefault(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE))); + } + + // send batch parallelism + final String sendBatchParallelism = properties.get(Properties.SEND_BATCH_PARALLELISM); + if (sendBatchParallelism != null) { + try { + final int sendBatchParallelismValue = Integer.parseInt(sendBatchParallelism); + if (sendBatchParallelismValue < 1) { + throw new DdlException(Properties.SEND_BATCH_PARALLELISM + " must be greater than 0"); + } + } catch (NumberFormatException e) { + throw new DdlException(Properties.SEND_BATCH_PARALLELISM + " is not a number."); } } } - private void analyzeSubquery(Analyzer analyzer) throws UserException { - // Analyze columns mentioned in the statement. - Set mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - if (targetColumnNames == null) { - // the mentioned columns are columns which are visible to user, so here we use - // getBaseSchema(), not getFullSchema() - for (Column col : targetTable.getBaseSchema(false)) { - mentionedColumns.add(col.getName()); - targetColumns.add(col); - } - } else { - for (String colName : targetColumnNames) { - Column col = targetTable.getColumn(colName); - if (col == null) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName, targetTable.getName()); - } - if (!mentionedColumns.add(colName)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_FIELD_SPECIFIED_TWICE, colName); - } - targetColumns.add(col); - } - // hll column mush in mentionedColumns - for (Column col : targetTable.getBaseSchema()) { - if (col.getType().isObjectStored() && !mentionedColumns.contains(col.getName())) { - throw new AnalysisException(" object-stored column " + col.getName() - + " mush in insert into columns"); - } - } - } - - /* - * When doing schema change, there may be some shadow columns. we should add - * them to the end of targetColumns. And use 'origColIdxsForExtendCols' to save - * the index of column in 'targetColumns' which the shadow column related to. - * eg: origin targetColumns: (A,B,C), shadow column: __doris_shadow_B after - * processing, targetColumns: (A, B, C, __doris_shadow_B), and - * origColIdxsForExtendCols has 1 element: "1", which is the index of column B - * in targetColumns. - * - * Rule A: If the column which the shadow column related to is not mentioned, - * then do not add the shadow column to targetColumns. They will be filled by - * null or default value when loading. - * - * When table have materialized view, there may be some materialized view columns. - * we should add them to the end of targetColumns. - * eg: origin targetColumns: (A,B,C), shadow column: mv_bitmap_union_C - * after processing, targetColumns: (A, B, C, mv_bitmap_union_C), and - * origColIdx2MVColumn has 1 element: "2, mv_bitmap_union_C" - * will be used in as a mapping from queryStmt.getResultExprs() to targetColumns define expr - */ - List> origColIdxsForExtendCols = Lists.newArrayList(); - for (Column column : targetTable.getFullSchema()) { - if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { - String origName = Column.removeNamePrefix(column.getName()); - for (int i = 0; i < targetColumns.size(); i++) { - if (targetColumns.get(i).nameEquals(origName, false)) { - // Rule A - origColIdxsForExtendCols.add(Pair.of(i, null)); - targetColumns.add(column); - break; - } - } - } - if (column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX) - || column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX)) { - List refColumns = column.getRefColumns(); - if (refColumns == null) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, - column.getName(), targetTable.getName()); - } - for (SlotRef refColumn : refColumns) { - String origName = refColumn.getColumnName(); - for (int originColumnIdx = 0; originColumnIdx < targetColumns.size(); originColumnIdx++) { - if (targetColumns.get(originColumnIdx).nameEquals(origName, false)) { - origColIdxsForExtendCols.add(Pair.of(originColumnIdx, column)); - targetColumns.add(column); - break; - } - } - } - } - } - - // parse query statement - queryStmt.setFromInsert(true); - queryStmt.analyze(analyzer); - - // check if size of select item equal with columns mentioned in statement - if (mentionedColumns.size() != queryStmt.getResultExprs().size()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT); - } - - // Check if all columns mentioned is enough - checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema()); - - Map slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - List baseColumns = targetTable.getBaseSchema(); - int size = Math.min(baseColumns.size(), queryStmt.getResultExprs().size()); - for (int i = 0; i < size; i++) { - slotToIndex.put(baseColumns.get(i).getName(), queryStmt.getResultExprs().get(i)); - } - - // handle VALUES() or SELECT constant list - if (isValuesOrConstantSelect) { - SelectStmt selectStmt = (SelectStmt) queryStmt; - if (selectStmt.getValueList() != null) { - // INSERT INTO VALUES(...) - List> rows = selectStmt.getValueList().getRows(); - for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) { - analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex); - } - - // clear these 2 structures, rebuild them using VALUES exprs - selectStmt.getResultExprs().clear(); - selectStmt.getBaseTblResultExprs().clear(); - - for (int i = 0; i < selectStmt.getValueList().getFirstRow().size(); ++i) { - selectStmt.getResultExprs().add(selectStmt.getValueList().getFirstRow().get(i)); - selectStmt.getBaseTblResultExprs().add(selectStmt.getValueList().getFirstRow().get(i)); - } - } else { - // INSERT INTO SELECT 1,2,3 ... - List> rows = Lists.newArrayList(); - // ATTN: must copy the `selectStmt.getResultExprs()`, otherwise the following - // `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing - // error. - rows.add(Lists.newArrayList(selectStmt.getResultExprs())); - analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex); - // rows may be changed in analyzeRow(), so rebuild the result exprs - selectStmt.getResultExprs().clear(); - for (Expr expr : rows.get(0)) { - selectStmt.getResultExprs().add(expr); - } - } - isStreaming = true; - } else { - // INSERT INTO SELECT ... FROM tbl - if (!origColIdxsForExtendCols.isEmpty()) { - // extend the result expr by duplicating the related exprs - for (Pair entry : origColIdxsForExtendCols) { - if (entry.second == null) { - queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first)); - } else { - //substitute define expr slot with select statement result expr - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - List columns = entry.second.getRefColumns(); - for (SlotRef slot : columns) { - smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); - } - Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), - smap, analyzer, false).get(0); - queryStmt.getResultExprs().add(e); - } - } - } - - // check compatibility - for (int i = 0; i < targetColumns.size(); ++i) { - Column column = targetColumns.get(i); - Expr expr = queryStmt.getResultExprs().get(i); - queryStmt.getResultExprs().set(i, expr.checkTypeCompatibility(column.getType())); - } - } - - // expand colLabels in QueryStmt - if (!origColIdxsForExtendCols.isEmpty()) { - if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) { - for (Pair entry : origColIdxsForExtendCols) { - if (entry.second == null) { - queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first)); - } else { - //substitute define expr slot with select statement result expr - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - List columns = entry.second.getRefColumns(); - for (SlotRef slot : columns) { - smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); - } - Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), - smap, analyzer, false).get(0); - queryStmt.getBaseTblResultExprs().add(e); - } - } - } - - if (queryStmt.getResultExprs().size() != queryStmt.getColLabels().size()) { - for (Pair entry : origColIdxsForExtendCols) { - queryStmt.getColLabels().add(queryStmt.getColLabels().get(entry.first)); - } - } - } - - if (LOG.isDebugEnabled()) { - for (Expr expr : queryStmt.getResultExprs()) { - LOG.debug("final result expr: {}, {}", expr, System.identityHashCode(expr)); - } - for (Expr expr : queryStmt.getBaseTblResultExprs()) { - LOG.debug("final base table result expr: {}, {}", expr, System.identityHashCode(expr)); - } - for (String colLabel : queryStmt.getColLabels()) { - LOG.debug("final col label: {}", colLabel); - } - } + public NativeInsertStmt getNativeInsertStmt() { + throw new UnsupportedOperationException("only invoked in NativeInsertStmt"); } - private void analyzeRow(Analyzer analyzer, List targetColumns, List> rows, - int rowIdx, List> origColIdxsForExtendCols, Map slotToIndex) - throws AnalysisException { - // 1. check number of fields if equal with first row - // targetColumns contains some shadow columns, which is added by system, - // so we should minus this - if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) { - throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1)); - } + /** + * TODO: unify the data_desc + * the unique entrance for data_desc + */ + interface DataDesc { - ArrayList row = rows.get(rowIdx); - if (!origColIdxsForExtendCols.isEmpty()) { - /** - * we should extend the row for shadow columns. - * eg: - * the origin row has exprs: (expr1, expr2, expr3), and targetColumns is (A, B, C, __doris_shadow_b) - * after processing, extentedRow is (expr1, expr2, expr3, expr2) - */ - ArrayList extentedRow = Lists.newArrayList(); - extentedRow.addAll(row); + String toSql(); - for (Pair entry : origColIdxsForExtendCols) { - if (entry != null) { - if (entry.second == null) { - extentedRow.add(extentedRow.get(entry.first)); - } else { - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - List columns = entry.second.getRefColumns(); - for (SlotRef slot : columns) { - smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); - } - extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), - smap, analyzer, false).get(0)); - } - } - } - - row = extentedRow; - rows.set(rowIdx, row); - } - // check the compatibility of expr in row and column in targetColumns - for (int i = 0; i < row.size(); ++i) { - Expr expr = row.get(i); - Column col = targetColumns.get(i); - - if (expr instanceof DefaultValueExpr) { - if (targetColumns.get(i).getDefaultValue() == null) { - throw new AnalysisException("Column has no default value, column=" - + targetColumns.get(i).getName()); - } - expr = new StringLiteral(targetColumns.get(i).getDefaultValue()); - } - - expr.analyze(analyzer); - - row.set(i, expr.checkTypeCompatibility(col.getType())); - } - } - - private void analyzePlanHints(Analyzer analyzer) throws AnalysisException { - if (planHints == null) { - return; - } - for (String hint : planHints) { - if (SHUFFLE_HINT.equalsIgnoreCase(hint)) { - if (!targetTable.isPartitioned()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); - } - if (isRepartition != null && !isRepartition) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint); - } - isRepartition = Boolean.TRUE; - } else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) { - if (!targetTable.isPartitioned()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); - } - if (isRepartition != null && isRepartition) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint); - } - isRepartition = Boolean.FALSE; - } else if (STREAMING.equalsIgnoreCase(hint)) { - isStreaming = true; - } else { - ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PLAN_HINT, hint); - } - } - } - - public void prepareExpressions() throws UserException { - List selectList = Expr.cloneList(queryStmt.getResultExprs()); - // check type compatibility - int numCols = targetColumns.size(); - for (int i = 0; i < numCols; ++i) { - Column col = targetColumns.get(i); - Expr expr = selectList.get(i).checkTypeCompatibility(col.getType()); - selectList.set(i, expr); - exprByName.put(col.getName(), expr); - } - List> resultExprByName = Lists.newArrayList(); - // reorder resultExprs in table column order - for (Column col : targetTable.getFullSchema()) { - if (exprByName.containsKey(col.getName())) { - resultExprByName.add(Pair.of(col.getName(), exprByName.get(col.getName()))); - } else { - // process sequence col, map sequence column to other column - if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol() - && col.getName().equals(Column.SEQUENCE_COL) - && ((OlapTable) targetTable).getSequenceMapCol() != null) { - if (resultExprByName.stream().map(Pair::key) - .anyMatch(key -> key.equals(((OlapTable) targetTable).getSequenceMapCol()))) { - resultExprByName.add(Pair.of(Column.SEQUENCE_COL, - resultExprByName.stream() - .filter(p -> p.key().equals(((OlapTable) targetTable).getSequenceMapCol())) - .map(Pair::value).findFirst().orElse(null))); - } - } else if (col.getDefaultValue() == null) { - resultExprByName.add(Pair.of(col.getName(), NullLiteral.create(col.getType()))); - } else { - if (col.getDefaultValueExprDef() != null) { - resultExprByName.add(Pair.of(col.getName(), col.getDefaultValueExpr())); - } else { - StringLiteral defaultValueExpr; - defaultValueExpr = new StringLiteral(col.getDefaultValue()); - resultExprByName.add(Pair.of(col.getName(), - defaultValueExpr.checkTypeCompatibility(col.getType()))); - } - } - } - } - resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList())); - } - - private DataSink createDataSink() throws AnalysisException { - if (dataSink != null) { - return dataSink; - } - if (targetTable instanceof OlapTable) { - dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, - analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); - dataPartition = dataSink.getOutputPartition(); - } else if (targetTable instanceof BrokerTable) { - BrokerTable table = (BrokerTable) targetTable; - // TODO(lingbin): think use which one if have more than one path - // Map brokerProperties = Maps.newHashMap(); - // BrokerDesc brokerDesc = new BrokerDesc("test_broker", brokerProperties); - BrokerDesc brokerDesc = new BrokerDesc(table.getBrokerName(), table.getBrokerProperties()); - dataSink = new ExportSink( - table.getWritablePath(), - table.getColumnSeparator(), - table.getLineDelimiter(), - brokerDesc); - dataPartition = dataSink.getOutputPartition(); - } else { - dataSink = DataSink.createDataSink(targetTable); - dataPartition = DataPartition.UNPARTITIONED; - } - return dataSink; - } - - public void complete() throws UserException { - if (!isExplain() && targetTable instanceof OlapTable) { - ((OlapTableSink) dataSink).complete(); - // add table indexes to transaction state - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), transactionId); - if (txnState == null) { - throw new DdlException("txn does not exist: " + transactionId); - } - txnState.addTableIndexes((OlapTable) targetTable); - } - } - - @Override - public ArrayList getResultExprs() { - return resultExprs; - } - - public DataPartition getDataPartition() { - return dataPartition; - } - - @Override - public void reset() { - super.reset(); - if (targetPartitionIds != null) { - targetPartitionIds.clear(); - } - queryStmt.reset(); - resultExprs.clear(); - exprByName.clear(); - dataSink = null; - dataPartition = null; - targetColumns.clear(); - } - - @Override - public RedirectStatus getRedirectStatus() { - if (isExplain()) { - return RedirectStatus.NO_FORWARD; - } else { - return RedirectStatus.FORWARD_WITH_SYNC; - } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LabelName.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LabelName.java index 84480f8544..555a73f6cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LabelName.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LabelName.java @@ -32,6 +32,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +// TODO(tsy): maybe better to rename as `LoadLabel` // label name used to identify a load job public class LabelName implements Writable { private String dbName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index b3cfce06cf..2abdc1dc6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -109,8 +109,8 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_STRICT_MODE = "strict_mode"; public static final String KEY_IN_PARAM_TIMEZONE = "timezone"; public static final String KEY_IN_PARAM_EXEC_MEM_LIMIT = "exec_mem_limit"; - public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths"; - public static final String KEY_IN_PARAM_JSONROOT = "json_root"; + public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths"; + public static final String KEY_IN_PARAM_JSONROOT = "json_root"; public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array"; public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse"; public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string"; @@ -273,6 +273,7 @@ public class LoadStmt extends DdlStmt { return brokerDesc; } + @Deprecated public String getCluster() { return cluster; } @@ -285,6 +286,7 @@ public class LoadStmt extends DdlStmt { return properties; } + @Deprecated public String getUser() { return user; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadType.java new file mode 100644 index 0000000000..dc93a97d75 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadType.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.load.EtlJobType; + +import com.google.common.base.Preconditions; + +import java.util.EnumMap; + +public enum LoadType { + + UNKNOWN, + NATIVE_INSERT, + BROKER_LOAD, + SPARK_LOAD, + MYSQL_LOAD, + ROUTINE_LOAD, + STREAM_LOAD; + + private static final EnumMap LOAD_TYPE_TO_ETL_TYPE = new EnumMap<>(LoadType.class); + + static { + LOAD_TYPE_TO_ETL_TYPE.put(NATIVE_INSERT, EtlJobType.INSERT); + LOAD_TYPE_TO_ETL_TYPE.put(BROKER_LOAD, EtlJobType.BROKER); + LOAD_TYPE_TO_ETL_TYPE.put(SPARK_LOAD, EtlJobType.SPARK); + LOAD_TYPE_TO_ETL_TYPE.put(MYSQL_LOAD, EtlJobType.LOCAL_FILE); + // TODO(tsy): add routine load and stream load + } + + public static EtlJobType getEtlJobType(LoadType loadType) { + return Preconditions.checkNotNull(LOAD_TYPE_TO_ETL_TYPE.get(loadType)); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MysqlLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MysqlLoadStmt.java new file mode 100644 index 0000000000..51c362bc91 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MysqlLoadStmt.java @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java +// and modified by Doris + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class MysqlLoadStmt extends InsertStmt { + + private DataDescription dataDescription; + + @Override + public List getDataDescList() { + return Collections.singletonList(dataDescription); + } + + @Override + public ResourceDesc getResourceDesc() { + // mysql load does not have resource desc + return null; + } + + @Override + public LoadType getLoadType() { + return LoadType.MYSQL_LOAD; + } + + @Override + public void analyzeProperties() throws DdlException { + + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer); + dataDescription.analyze(fullDbName); + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName); + OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName()); + dataDescription.checkKeyTypeForLoad(table); + if (!dataDescription.isClientLocal()) { + for (String path : dataDescription.getFilePaths()) { + if (Config.mysql_load_server_secure_path.isEmpty()) { + throw new AnalysisException("Load local data from fe local is not enabled. If you want to use it," + + " plz set the `mysql_load_server_secure_path` for FE to be a right path."); + } else { + File file = new File(path); + try { + if (!(file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path))) { + throw new AnalysisException("Local file should be under the secure path of FE."); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (!file.exists()) { + throw new AnalysisException("File: " + path + " is not exists."); + } + } + } + } + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java new file mode 100644 index 0000000000..035c3e0bb2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -0,0 +1,829 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.alter.SchemaChangeHandler; +import org.apache.doris.catalog.BrokerTable; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.MysqlTable; +import org.apache.doris.catalog.OdbcTable; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.JdbcExternalDatabase; +import org.apache.doris.catalog.external.JdbcExternalTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.JdbcExternalCatalog; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.ExportSink; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Insert into is performed to load data from the result of query stmt. + *

+ * syntax: + * INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt + *

+ * table_name: is the name of target table + * partition_info: PARTITION (p1,p2) + * the partition info of target table + * col_list: (c1,c2) + * the column list of target table + * plan_hints: [STREAMING,SHUFFLE_HINT] + * The streaming plan is used by both streaming and non-streaming insert stmt. + * The only difference is that non-streaming will record the load info in LoadManager and return label. + * User can check the load info by show load stmt. + */ +public class NativeInsertStmt extends InsertStmt { + + private static final Logger LOG = LogManager.getLogger(InsertStmt.class); + + private static final String SHUFFLE_HINT = "SHUFFLE"; + private static final String NOSHUFFLE_HINT = "NOSHUFFLE"; + + private final TableName tblName; + private final PartitionNames targetPartitionNames; + // parsed from targetPartitionNames. + private List targetPartitionIds; + private final List targetColumnNames; + private QueryStmt queryStmt; + private final List planHints; + private Boolean isRepartition; + + // set after parse all columns and expr in query statement + // this result expr in the order of target table's columns + private final List resultExprs = Lists.newArrayList(); + + private final Map exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + private Table targetTable; + + private DatabaseIf db; + private long transactionId; + + // we need a new TupleDesc for olap table. + private TupleDescriptor olapTuple; + + private DataSink dataSink; + private DataPartition dataPartition; + + private final List targetColumns = Lists.newArrayList(); + + /* + * InsertStmt may be analyzed twice, but transaction must be only begun once. + * So use a boolean to check if transaction already begun. + */ + private boolean isTransactionBegin = false; + + private boolean isValuesOrConstantSelect; + + + public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, + List hints) { + this.tblName = target.getTblName(); + this.targetPartitionNames = target.getPartitionNames(); + this.label = new LabelName(null, label); + this.queryStmt = source.getQueryStmt(); + this.planHints = hints; + this.targetColumnNames = cols; + this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt + && ((SelectStmt) queryStmt).getTableRefs().isEmpty()); + } + + // Ctor for CreateTableAsSelectStmt + public NativeInsertStmt(TableName name, QueryStmt queryStmt) { + this.tblName = name; + this.targetPartitionNames = null; + this.targetColumnNames = null; + this.queryStmt = queryStmt; + this.planHints = null; + } + + public boolean isValuesOrConstantSelect() { + return isValuesOrConstantSelect; + } + + public Table getTargetTable() { + return targetTable; + } + + public void setTargetTable(Table targetTable) { + this.targetTable = targetTable; + } + + public long getTransactionId() { + return this.transactionId; + } + + public Boolean isRepartition() { + return isRepartition; + } + + public String getDbName() { + return tblName.getDb(); + } + + public String getTbl() { + return tblName.getTbl(); + } + + public void getTables(Analyzer analyzer, Map tableMap, Set parentViewNameSet) + throws AnalysisException { + // get dbs of statement + queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet); + tblName.analyze(analyzer); + // disallow external catalog except JdbcExternalCatalog + if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog + && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { + Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + } + String dbName = tblName.getDb(); + String tableName = tblName.getTbl(); + // check exist + DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName); + TableIf table = db.getTableOrAnalysisException(tblName.getTbl()); + + // check access + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), dbName, tableName, PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + dbName + ": " + tableName); + } + + tableMap.put(table.getId(), table); + } + + public QueryStmt getQueryStmt() { + return queryStmt; + } + + public void setQueryStmt(QueryStmt queryStmt) { + this.queryStmt = queryStmt; + } + + public boolean isExplain() { + return queryStmt.isExplain(); + } + + public String getLabel() { + return label.getLabelName(); + } + + public DataSink getDataSink() { + return dataSink; + } + + public DatabaseIf getDbObj() { + return db; + } + + public boolean isTransactionBegin() { + return isTransactionBegin; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + if (targetTable == null) { + tblName.analyze(analyzer); + // disallow external catalog except JdbcExternalCatalog + if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog + && !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) { + Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); + } + } + + // Check privilege + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tblName.getDb(), + tblName.getTbl(), PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tblName.getDb() + ": " + tblName.getTbl()); + } + + // check partition + if (targetPartitionNames != null) { + targetPartitionNames.analyze(analyzer); + } + + // set target table and + analyzeTargetTable(analyzer); + + analyzeSubquery(analyzer); + + analyzePlanHints(); + + if (analyzer.getContext().isTxnModel()) { + return; + } + + // create data sink + createDataSink(); + + db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); + // create label and begin transaction + long timeoutSecond = ConnectContext.get().getExecTimeout(); + if (label == null || Strings.isNullOrEmpty(label.getLabelName())) { + label = new LabelName(db.getFullName(), + "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_")); + } + if (!isExplain() && !isTransactionBegin) { + if (targetTable instanceof OlapTable) { + LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; + transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), + Lists.newArrayList(targetTable.getId()), label.getLabelName(), + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + sourceType, timeoutSecond); + } + isTransactionBegin = true; + } + + // init data sink + if (!isExplain() && targetTable instanceof OlapTable) { + OlapTableSink sink = (OlapTableSink) dataSink; + TUniqueId loadId = analyzer.getContext().queryId(); + int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism(); + sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism, false); + } + } + + private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { + // Get table + if (targetTable == null) { + DatabaseIf db = analyzer.getEnv().getCatalogMgr() + .getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); + if (db instanceof Database) { + targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl()); + } else if (db instanceof JdbcExternalDatabase) { + JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl()); + targetTable = jdbcTable.getJdbcTable(); + } else { + throw new AnalysisException("Not support insert target table."); + } + } + + if (targetTable instanceof OlapTable) { + OlapTable olapTable = (OlapTable) targetTable; + + // partition + if (targetPartitionNames != null) { + targetPartitionIds = Lists.newArrayList(); + if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); + } + for (String partName : targetPartitionNames.getPartitionNames()) { + Partition part = olapTable.getPartition(partName, targetPartitionNames.isTemp()); + if (part == null) { + ErrorReport.reportAnalysisException( + ErrorCode.ERR_UNKNOWN_PARTITION, partName, targetTable.getName()); + } + targetPartitionIds.add(part.getId()); + } + } + // need a descriptor + DescriptorTable descTable = analyzer.getDescTbl(); + olapTuple = descTable.createTupleDescriptor(); + for (Column col : olapTable.getFullSchema()) { + SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(col.getType()); + slotDesc.setColumn(col); + slotDesc.setIsNullable(col.isAllowNull()); + } + } else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable + || targetTable instanceof JdbcTable) { + if (targetPartitionNames != null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); + } + } else if (targetTable instanceof BrokerTable) { + if (targetPartitionNames != null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); + } + + BrokerTable brokerTable = (BrokerTable) targetTable; + if (!brokerTable.isWritable()) { + throw new AnalysisException("table " + brokerTable.getName() + + "is not writable. path should be an dir"); + } + + } else { + ErrorReport.reportAnalysisException( + ErrorCode.ERR_NON_INSERTABLE_TABLE, targetTable.getName(), targetTable.getType()); + } + } + + private void checkColumnCoverage(Set mentionedCols, List baseColumns) + throws AnalysisException { + + // check columns of target table + for (Column col : baseColumns) { + if (mentionedCols.contains(col.getName())) { + continue; + } + if (col.getDefaultValue() == null && !col.isAllowNull()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NOT_MENTIONED, col.getName()); + } + } + } + + private void analyzeSubquery(Analyzer analyzer) throws UserException { + // Analyze columns mentioned in the statement. + Set mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + if (targetColumnNames == null) { + // the mentioned columns are columns which are visible to user, so here we use + // getBaseSchema(), not getFullSchema() + for (Column col : targetTable.getBaseSchema(false)) { + mentionedColumns.add(col.getName()); + targetColumns.add(col); + } + } else { + for (String colName : targetColumnNames) { + Column col = targetTable.getColumn(colName); + if (col == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName, targetTable.getName()); + } + if (!mentionedColumns.add(colName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_FIELD_SPECIFIED_TWICE, colName); + } + targetColumns.add(col); + } + // hll column mush in mentionedColumns + for (Column col : targetTable.getBaseSchema()) { + if (col.getType().isObjectStored() && !mentionedColumns.contains(col.getName())) { + throw new AnalysisException(" object-stored column " + col.getName() + + " mush in insert into columns"); + } + } + } + + /* + * When doing schema change, there may be some shadow columns. we should add + * them to the end of targetColumns. And use 'origColIdxsForExtendCols' to save + * the index of column in 'targetColumns' which the shadow column related to. + * eg: origin targetColumns: (A,B,C), shadow column: __doris_shadow_B after + * processing, targetColumns: (A, B, C, __doris_shadow_B), and + * origColIdxsForExtendCols has 1 element: "1", which is the index of column B + * in targetColumns. + * + * Rule A: If the column which the shadow column related to is not mentioned, + * then do not add the shadow column to targetColumns. They will be filled by + * null or default value when loading. + * + * When table have materialized view, there may be some materialized view columns. + * we should add them to the end of targetColumns. + * eg: origin targetColumns: (A,B,C), shadow column: mv_bitmap_union_C + * after processing, targetColumns: (A, B, C, mv_bitmap_union_C), and + * origColIdx2MVColumn has 1 element: "2, mv_bitmap_union_C" + * will be used in as a mapping from queryStmt.getResultExprs() to targetColumns define expr + */ + List> origColIdxsForExtendCols = Lists.newArrayList(); + for (Column column : targetTable.getFullSchema()) { + if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { + String origName = Column.removeNamePrefix(column.getName()); + for (int i = 0; i < targetColumns.size(); i++) { + if (targetColumns.get(i).nameEquals(origName, false)) { + // Rule A + origColIdxsForExtendCols.add(Pair.of(i, null)); + targetColumns.add(column); + break; + } + } + } + if (column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX) + || column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX)) { + List refColumns = column.getRefColumns(); + if (refColumns == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, + column.getName(), targetTable.getName()); + } + for (SlotRef refColumn : refColumns) { + String origName = refColumn.getColumnName(); + for (int originColumnIdx = 0; originColumnIdx < targetColumns.size(); originColumnIdx++) { + if (targetColumns.get(originColumnIdx).nameEquals(origName, false)) { + origColIdxsForExtendCols.add(Pair.of(originColumnIdx, column)); + targetColumns.add(column); + break; + } + } + } + } + } + + // parse query statement + queryStmt.setFromInsert(true); + queryStmt.analyze(analyzer); + + // check if size of select item equal with columns mentioned in statement + if (mentionedColumns.size() != queryStmt.getResultExprs().size()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT); + } + + // Check if all columns mentioned is enough + checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema()); + + Map slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + List baseColumns = targetTable.getBaseSchema(); + int size = Math.min(baseColumns.size(), queryStmt.getResultExprs().size()); + for (int i = 0; i < size; i++) { + slotToIndex.put(baseColumns.get(i).getName(), queryStmt.getResultExprs().get(i)); + } + + // handle VALUES() or SELECT constant list + if (isValuesOrConstantSelect) { + SelectStmt selectStmt = (SelectStmt) queryStmt; + if (selectStmt.getValueList() != null) { + // INSERT INTO VALUES(...) + List> rows = selectStmt.getValueList().getRows(); + for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) { + analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex); + } + + // clear these 2 structures, rebuild them using VALUES exprs + selectStmt.getResultExprs().clear(); + selectStmt.getBaseTblResultExprs().clear(); + + for (int i = 0; i < selectStmt.getValueList().getFirstRow().size(); ++i) { + selectStmt.getResultExprs().add(selectStmt.getValueList().getFirstRow().get(i)); + selectStmt.getBaseTblResultExprs().add(selectStmt.getValueList().getFirstRow().get(i)); + } + } else { + // INSERT INTO SELECT 1,2,3 ... + List> rows = Lists.newArrayList(); + // ATTN: must copy the `selectStmt.getResultExprs()`, otherwise the following + // `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing + // error. + rows.add(Lists.newArrayList(selectStmt.getResultExprs())); + analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex); + // rows may be changed in analyzeRow(), so rebuild the result exprs + selectStmt.getResultExprs().clear(); + for (Expr expr : rows.get(0)) { + selectStmt.getResultExprs().add(expr); + } + } + } else { + // INSERT INTO SELECT ... FROM tbl + if (!origColIdxsForExtendCols.isEmpty()) { + // extend the result expr by duplicating the related exprs + for (Pair entry : origColIdxsForExtendCols) { + if (entry.second == null) { + queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first)); + } else { + //substitute define expr slot with select statement result expr + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + List columns = entry.second.getRefColumns(); + for (SlotRef slot : columns) { + smap.getLhs().add(slot); + smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + } + Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), + smap, analyzer, false).get(0); + queryStmt.getResultExprs().add(e); + } + } + } + + // check compatibility + for (int i = 0; i < targetColumns.size(); ++i) { + Column column = targetColumns.get(i); + Expr expr = queryStmt.getResultExprs().get(i); + queryStmt.getResultExprs().set(i, expr.checkTypeCompatibility(column.getType())); + } + } + + // expand colLabels in QueryStmt + if (!origColIdxsForExtendCols.isEmpty()) { + if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) { + for (Pair entry : origColIdxsForExtendCols) { + if (entry.second == null) { + queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first)); + } else { + //substitute define expr slot with select statement result expr + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + List columns = entry.second.getRefColumns(); + for (SlotRef slot : columns) { + smap.getLhs().add(slot); + smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + } + Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), + smap, analyzer, false).get(0); + queryStmt.getBaseTblResultExprs().add(e); + } + } + } + + if (queryStmt.getResultExprs().size() != queryStmt.getColLabels().size()) { + for (Pair entry : origColIdxsForExtendCols) { + queryStmt.getColLabels().add(queryStmt.getColLabels().get(entry.first)); + } + } + } + + if (LOG.isDebugEnabled()) { + for (Expr expr : queryStmt.getResultExprs()) { + LOG.debug("final result expr: {}, {}", expr, System.identityHashCode(expr)); + } + for (Expr expr : queryStmt.getBaseTblResultExprs()) { + LOG.debug("final base table result expr: {}, {}", expr, System.identityHashCode(expr)); + } + for (String colLabel : queryStmt.getColLabels()) { + LOG.debug("final col label: {}", colLabel); + } + } + } + + private void analyzeRow(Analyzer analyzer, List targetColumns, List> rows, + int rowIdx, List> origColIdxsForExtendCols, Map slotToIndex) + throws AnalysisException { + // 1. check number of fields if equal with first row + // targetColumns contains some shadow columns, which is added by system, + // so we should minus this + if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) { + throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1)); + } + + ArrayList row = rows.get(rowIdx); + if (!origColIdxsForExtendCols.isEmpty()) { + /** + * we should extend the row for shadow columns. + * eg: + * the origin row has exprs: (expr1, expr2, expr3), and targetColumns is (A, B, C, __doris_shadow_b) + * after processing, extentedRow is (expr1, expr2, expr3, expr2) + */ + ArrayList extentedRow = Lists.newArrayList(); + extentedRow.addAll(row); + + for (Pair entry : origColIdxsForExtendCols) { + if (entry != null) { + if (entry.second == null) { + extentedRow.add(extentedRow.get(entry.first)); + } else { + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + List columns = entry.second.getRefColumns(); + for (SlotRef slot : columns) { + smap.getLhs().add(slot); + smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + } + extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), + smap, analyzer, false).get(0)); + } + } + } + + row = extentedRow; + rows.set(rowIdx, row); + } + // check the compatibility of expr in row and column in targetColumns + for (int i = 0; i < row.size(); ++i) { + Expr expr = row.get(i); + Column col = targetColumns.get(i); + + if (expr instanceof DefaultValueExpr) { + if (targetColumns.get(i).getDefaultValue() == null) { + throw new AnalysisException("Column has no default value, column=" + + targetColumns.get(i).getName()); + } + expr = new StringLiteral(targetColumns.get(i).getDefaultValue()); + } + + expr.analyze(analyzer); + + row.set(i, expr.checkTypeCompatibility(col.getType())); + } + } + + private void analyzePlanHints() throws AnalysisException { + if (planHints == null) { + return; + } + for (String hint : planHints) { + if (SHUFFLE_HINT.equalsIgnoreCase(hint)) { + if (!targetTable.isPartitioned()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); + } + if (isRepartition != null && !isRepartition) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint); + } + isRepartition = Boolean.TRUE; + } else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) { + if (!targetTable.isPartitioned()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT); + } + if (isRepartition != null && isRepartition) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint); + } + isRepartition = Boolean.FALSE; + } else { + ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PLAN_HINT, hint); + } + } + } + + public void prepareExpressions() throws UserException { + List selectList = Expr.cloneList(queryStmt.getResultExprs()); + // check type compatibility + int numCols = targetColumns.size(); + for (int i = 0; i < numCols; ++i) { + Column col = targetColumns.get(i); + Expr expr = selectList.get(i).checkTypeCompatibility(col.getType()); + selectList.set(i, expr); + exprByName.put(col.getName(), expr); + } + List> resultExprByName = Lists.newArrayList(); + // reorder resultExprs in table column order + for (Column col : targetTable.getFullSchema()) { + if (exprByName.containsKey(col.getName())) { + resultExprByName.add(Pair.of(col.getName(), exprByName.get(col.getName()))); + } else { + // process sequence col, map sequence column to other column + if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol() + && col.getName().equals(Column.SEQUENCE_COL) + && ((OlapTable) targetTable).getSequenceMapCol() != null) { + if (resultExprByName.stream().map(Pair::key) + .anyMatch(key -> key.equals(((OlapTable) targetTable).getSequenceMapCol()))) { + resultExprByName.add(Pair.of(Column.SEQUENCE_COL, + resultExprByName.stream() + .filter(p -> p.key().equals(((OlapTable) targetTable).getSequenceMapCol())) + .map(Pair::value).findFirst().orElse(null))); + } + } else if (col.getDefaultValue() == null) { + resultExprByName.add(Pair.of(col.getName(), NullLiteral.create(col.getType()))); + } else { + if (col.getDefaultValueExprDef() != null) { + resultExprByName.add(Pair.of(col.getName(), col.getDefaultValueExpr())); + } else { + StringLiteral defaultValueExpr; + defaultValueExpr = new StringLiteral(col.getDefaultValue()); + resultExprByName.add(Pair.of(col.getName(), + defaultValueExpr.checkTypeCompatibility(col.getType()))); + } + } + } + } + resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList())); + } + + private DataSink createDataSink() throws AnalysisException { + if (dataSink != null) { + return dataSink; + } + if (targetTable instanceof OlapTable) { + dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, + analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + dataPartition = dataSink.getOutputPartition(); + } else if (targetTable instanceof BrokerTable) { + BrokerTable table = (BrokerTable) targetTable; + // TODO(lingbin): think use which one if have more than one path + // Map brokerProperties = Maps.newHashMap(); + // BrokerDesc brokerDesc = new BrokerDesc("test_broker", brokerProperties); + BrokerDesc brokerDesc = new BrokerDesc(table.getBrokerName(), table.getBrokerProperties()); + dataSink = new ExportSink( + table.getWritablePath(), + table.getColumnSeparator(), + table.getLineDelimiter(), + brokerDesc); + dataPartition = dataSink.getOutputPartition(); + } else { + dataSink = DataSink.createDataSink(targetTable); + dataPartition = DataPartition.UNPARTITIONED; + } + return dataSink; + } + + public void complete() throws UserException { + if (!isExplain() && targetTable instanceof OlapTable) { + ((OlapTableSink) dataSink).complete(); + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), transactionId); + if (txnState == null) { + throw new DdlException("txn does not exist: " + transactionId); + } + txnState.addTableIndexes((OlapTable) targetTable); + } + } + + public DataPartition getDataPartition() { + return dataPartition; + } + + @Override + public List getDataDescList() { + throw new UnsupportedOperationException("only invoked for external load currently"); + } + + @Override + public ResourceDesc getResourceDesc() { + throw new UnsupportedOperationException("only invoked for external load currently"); + } + + @Override + public LoadType getLoadType() { + return LoadType.NATIVE_INSERT; + } + + @Override + public NativeInsertStmt getNativeInsertStmt() { + return this; + } + + @Override + public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException { + Preconditions.checkState(isAnalyzed()); + queryStmt.rewriteExprs(rewriter); + } + + @Override + public void foldConstant(ExprRewriter rewriter, TQueryOptions tQueryOptions) throws AnalysisException { + Preconditions.checkState(isAnalyzed()); + queryStmt.foldConstant(rewriter, tQueryOptions); + } + + @Override + public List getResultExprs() { + return resultExprs; + } + + @Override + public void reset() { + super.reset(); + if (targetPartitionIds != null) { + targetPartitionIds.clear(); + } + queryStmt.reset(); + resultExprs.clear(); + exprByName.clear(); + dataSink = null; + dataPartition = null; + targetColumns.clear(); + } + + @Override + public RedirectStatus getRedirectStatus() { + if (isExplain()) { + return RedirectStatus.NO_FORWARD; + } else { + return RedirectStatus.FORWARD_WITH_SYNC; + } + } + + @Override + public String toSql() { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceDesc.java index e6211ef4c9..d3c421234c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResourceDesc.java @@ -40,10 +40,15 @@ import java.util.Map; public class ResourceDesc { protected String name; protected Map properties; + /** + * TODO(tsy): transfer to LoadType + */ protected EtlJobType etlJobType; + protected LoadType loadType; + // Only used for recovery - private ResourceDesc() { + public ResourceDesc() { } public ResourceDesc(String name, Map properties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SparkLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SparkLoadStmt.java new file mode 100644 index 0000000000..6223a9bc3f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SparkLoadStmt.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java +// and modified by Doris + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SparkLoadStmt extends InsertStmt { + + private final DataDescription dataDescription; + + private final ResourceDesc resourceDesc; + + public SparkLoadStmt(LabelName label, List dataDescList, ResourceDesc resourceDesc, + Map properties, String comments) { + this.label = label; + Preconditions.checkState(dataDescList.size() == 1, + "spark load could only have one desc"); + this.dataDescription = dataDescList.get(0); + this.resourceDesc = resourceDesc; + this.properties = properties; + this.comments = comments; + } + + @Override + public List getDataDescList() { + return Collections.singletonList(dataDescription); + } + + @Override + public ResourceDesc getResourceDesc() { + return resourceDesc; + } + + @Override + public LoadType getLoadType() { + return LoadType.SPARK_LOAD; + } + + @Override + public void analyzeProperties() throws DdlException { + + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + label.analyze(analyzer); + Preconditions.checkNotNull(dataDescription, new AnalysisException("No data file in load statement.")); + Preconditions.checkNotNull(resourceDesc, new AnalysisException("Resource desc not found")); + String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer); + dataDescription.analyze(fullDbName); + resourceDesc.analyze(); + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName); + OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName()); + dataDescription.checkKeyTypeForLoad(table); + // check resource usage privilege + if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(), + resourceDesc.getName(), + PrivPredicate.USAGE)) { + throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser() + + "'@'" + ConnectContext.get().getRemoteIP() + + "' for resource '" + resourceDesc.getName() + "'"); + } + } + + @Override + public String toSql() { + return super.toSql(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java index 11ce1f26e4..6c99b1d0f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java @@ -29,12 +29,12 @@ import java.util.Map; * | * The broker's StorageBackend.StorageType desc */ -public class StorageDesc { - protected String name; - protected StorageBackend.StorageType storageType; - protected Map properties; +public class StorageDesc extends ResourceDesc { - public StorageDesc() {} + protected StorageBackend.StorageType storageType; + + public StorageDesc() { + } public StorageDesc(String name, StorageBackend.StorageType storageType, Map properties) { this.name = name; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java new file mode 100644 index 0000000000..4b36793109 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Used for load refactor, as an adapter for original load stmt, will proxy to insert stmt or original load stmt, chosen + * by configuration + */ +public class UnifiedLoadStmt extends DdlStmt { + + private final StatementBase proxyStmt; + + public UnifiedLoadStmt(LabelName label, List dataDescriptions, + BrokerDesc brokerDesc, String cluster, Map properties, String comment, LoadType loadType) { + final ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) { + switch (loadType) { + case BROKER_LOAD: + proxyStmt = new BrokerLoadStmt(label, dataDescriptions, brokerDesc, properties, comment); + break; + case MYSQL_LOAD: + default: + throw new IllegalStateException("does not support load type: " + loadType); + } + } else { + proxyStmt = new LoadStmt(label, dataDescriptions, brokerDesc, cluster, properties, comment); + } + } + + public void init() { + Preconditions.checkNotNull(proxyStmt, "impossible state, proxy stmt should be not null"); + proxyStmt.setOrigStmt(getOrigStmt()); + proxyStmt.setUserInfo(getUserInfo()); + } + + public StatementBase getProxyStmt() { + return proxyStmt; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java index d34e192ddb..91e1707295 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -119,7 +119,7 @@ public class UpdateStmt extends DdlStmt { LimitElement.NO_LIMIT ); - insertStmt = new InsertStmt( + insertStmt = new NativeInsertStmt( new InsertTarget(tableName, null), null, cols, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ed720a8992..5666e5541b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -150,6 +150,7 @@ import org.apache.doris.load.loadv2.LoadEtlChecker; import org.apache.doris.load.loadv2.LoadJobScheduler; import org.apache.doris.load.loadv2.LoadLoadingChecker; import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.loadv2.LoadManagerAdapter; import org.apache.doris.load.loadv2.ProgressManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; @@ -439,6 +440,11 @@ public class Env { private StatisticsCleaner statisticsCleaner; + /** + * TODO(tsy): to be removed after load refactor + */ + private final LoadManagerAdapter loadManagerAdapter; + private StatisticsAutoAnalyzer statisticsAutoAnalyzer; public List getFrontends(FrontendNodeType nodeType) { @@ -645,6 +651,7 @@ public class Env { } this.globalFunctionMgr = new GlobalFunctionMgr(); this.resourceGroupMgr = new ResourceGroupMgr(); + this.loadManagerAdapter = new LoadManagerAdapter(); } public static void destroyCheckpoint() { @@ -1038,7 +1045,8 @@ public class Env { clusterId = storage.getClusterID(); token = storage.getToken(); try { - URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(), Config.http_port) + "/check"); + URL idURL = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getIp(), + Config.http_port) + "/check"); HttpURLConnection conn = null; conn = (HttpURLConnection) idURL.openConnection(); conn.setConnectTimeout(2 * 1000); @@ -1110,7 +1118,8 @@ public class Env { try { // For upgrade compatibility, the host parameter name remains the same // and the new hostname parameter is added - URL url = new URL("http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port) + URL url = new URL( + "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getIp(), Config.http_port) + "/role?host=" + selfNode.getIp() + "&hostname=" + selfNode.getHostName() + "&port=" + selfNode.getPort()); HttpURLConnection conn = null; @@ -5261,6 +5270,10 @@ public class Env { return statisticsCleaner; } + public LoadManagerAdapter getLoadManagerAdapter() { + return loadManagerAdapter; + } + public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() { return statisticsAutoAnalyzer; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 677bd449e3..0ea617f2b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -371,4 +372,37 @@ public abstract class BulkLoadJob extends LoadJob { } return null; } + + // ---------------- for load stmt ---------------- + public static BulkLoadJob fromInsertStmt(InsertStmt insertStmt) throws DdlException { + // get db id + String dbName = insertStmt.getLoadLabel().getDbName(); + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); + + // create job + BulkLoadJob bulkLoadJob; + try { + switch (insertStmt.getLoadType()) { + case BROKER_LOAD: + bulkLoadJob = new BrokerLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(), + (BrokerDesc) insertStmt.getResourceDesc(), + insertStmt.getOrigStmt(), insertStmt.getUserInfo()); + break; + case SPARK_LOAD: + bulkLoadJob = new SparkLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(), + insertStmt.getResourceDesc(), + insertStmt.getOrigStmt(), insertStmt.getUserInfo()); + break; + default: + throw new DdlException("Unknown load job type."); + } + bulkLoadJob.setComment(insertStmt.getComments()); + bulkLoadJob.setJobProperties(insertStmt.getProperties()); + // TODO(tsy): use generic and change the param in checkAndSetDataSourceInfo + bulkLoadJob.checkAndSetDataSourceInfo(db, (List) insertStmt.getDataDescList()); + return bulkLoadJob; + } catch (MetaNotFoundException e) { + throw new DdlException(e.getMessage()); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index e63bce7c12..e601bca8fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -17,9 +17,11 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CleanLabelStmt; import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; @@ -109,7 +111,7 @@ public class LoadManager implements Writable { public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); long dbId = database.getId(); - LoadJob loadJob = null; + LoadJob loadJob; writeLock(); try { if (stmt.getBrokerDesc() != null && stmt.getBrokerDesc().isMultiLoadBroker()) { @@ -877,4 +879,43 @@ public class LoadManager implements Writable { } } } + + // ------------------------ for load refactor ------------------------ + public long createLoadJobFromStmt(InsertStmt insertStmt) throws DdlException { + Database database = checkDb(insertStmt.getLoadLabel().getDbName()); + long dbId = database.getId(); + LoadJob loadJob; + writeLock(); + BrokerDesc brokerDesc = (BrokerDesc) insertStmt.getResourceDesc(); + try { + if (brokerDesc != null && brokerDesc.isMultiLoadBroker()) { + if (!Env.getCurrentEnv().getLoadInstance() + .isUncommittedLabel(dbId, insertStmt.getLoadLabel().getLabelName())) { + throw new DdlException("label: " + insertStmt.getLoadLabel().getLabelName() + " not found!"); + } + } else { + checkLabelUsed(dbId, insertStmt.getLoadLabel().getLabelName()); + if (brokerDesc == null && insertStmt.getResourceDesc() == null) { + throw new DdlException("LoadManager only support the broker and spark load."); + } + if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) { + throw new DdlException( + "There are more than " + Config.desired_max_waiting_jobs + + " unfinished load jobs, please retry later. " + + "You can use `SHOW LOAD` to view submitted jobs"); + } + } + + loadJob = BulkLoadJob.fromInsertStmt(insertStmt); + createLoadJob(loadJob); + } finally { + writeUnlock(); + } + Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob); + + // The job must be submitted after edit log. + // It guarantee that load job has not been changed before edit log. + loadJobScheduler.submitJob(loadJob); + return loadJob.getId(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManagerAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManagerAdapter.java new file mode 100644 index 0000000000..83dddb9959 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManagerAdapter.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.InsertStmt; +import org.apache.doris.analysis.LoadType; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.qe.StmtExecutor; + +/** + * This class is temporary for load refactor, all unified external load should use this adapter in + * {@link StmtExecutor#handleExternalInsertStmt()} + *

+ * TODO(tsy): removed after job-manager system for loads is unified + */ +public class LoadManagerAdapter { + + public void startLoadFromInsertStmt(InsertStmt insertStmt) throws DdlException { + final LoadType loadType = insertStmt.getLoadType(); + final LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + switch (loadType) { + case BROKER_LOAD: + loadManager.createLoadJobFromStmt(insertStmt); + break; + case MYSQL_LOAD: + // TODO: implement + break; + case ROUTINE_LOAD: + // TODO: implement + break; + case STREAM_LOAD: + // TODO: implement + break; + default: + throw new DdlException("unsupported load type:" + loadType); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 6891ef415f..c53670544a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -245,7 +245,7 @@ public class OriginalPlanner extends Planner { rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments); rootFragment.setSink(insertStmt.getDataSink()); insertStmt.complete(); - ArrayList exprs = ((InsertStmt) statement).getResultExprs(); + List exprs = statement.getResultExprs(); List resExprs = Expr.substituteList( exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true); rootFragment.setOutputExprs(resExprs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fcde985f42..27eabb5c43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -313,6 +313,11 @@ public class SessionVariable implements Serializable, Writable { // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. public static final String FILE_SPLIT_SIZE = "file_split_size"; + /** + * use insert stmt as the unified backend for all loads + */ + public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load"; + public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization"; public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization"; @@ -800,7 +805,7 @@ public class SessionVariable implements Serializable, Writable { public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4; public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 8; @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, - checker = "checkExternalAggPartitionBits", fuzzy = true) + checker = "checkExternalAggPartitionBits", fuzzy = true) public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks. // Whether enable two phase read optimization @@ -856,6 +861,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + /** + * determine should we enable unified load (use insert stmt as the backend for all load) + */ + @VariableMgr.VarAttr(name = ENABLE_UNIFIED_LOAD, needForward = true) + public boolean enableUnifiedLoad = false; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2086,4 +2097,8 @@ public class SessionVariable implements Serializable, Writable { } return num; } + + public boolean isEnableUnifiedLoad() { + return enableUnifiedLoad; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index fd7c8fc003..634abe69c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -34,6 +34,7 @@ import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.LoadType; import org.apache.doris.analysis.LockTablesStmt; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.OutFileClause; @@ -58,6 +59,7 @@ import org.apache.doris.analysis.TransactionBeginStmt; import org.apache.doris.analysis.TransactionCommitStmt; import org.apache.doris.analysis.TransactionRollbackStmt; import org.apache.doris.analysis.TransactionStmt; +import org.apache.doris.analysis.UnifiedLoadStmt; import org.apache.doris.analysis.UnlockTablesStmt; import org.apache.doris.analysis.UnsupportedStmt; import org.apache.doris.analysis.UpdateStmt; @@ -94,6 +96,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.loadv2.LoadManagerAdapter; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlEofPacket; @@ -572,6 +575,18 @@ public class StmtExecutor { if (parsedStmt instanceof QueryStmt) { context.getState().setIsQuery(true); } + if (parsedStmt instanceof UnifiedLoadStmt) { + // glue code for unified load + final UnifiedLoadStmt unifiedLoadStmt = (UnifiedLoadStmt) parsedStmt; + unifiedLoadStmt.init(); + final StatementBase proxyStmt = unifiedLoadStmt.getProxyStmt(); + parsedStmt = proxyStmt; + if (!(proxyStmt instanceof LoadStmt)) { + Preconditions.checkState( + parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).needLoadManager(), + new IllegalStateException("enable_unified_load=true, should be external insert stmt")); + } + } try { if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt) @@ -584,7 +599,7 @@ public class StmtExecutor { if (!context.isTxnModel()) { Span queryAnalysisSpan = context.getTracer().spanBuilder("query analysis").setParent(Context.current()).startSpan(); - try (Scope scope = queryAnalysisSpan.makeCurrent()) { + try (Scope ignored = queryAnalysisSpan.makeCurrent()) { // analyze this query analyze(context.getSessionVariable().toThrift()); } catch (Exception e) { @@ -639,15 +654,22 @@ public class StmtExecutor { } else if (parsedStmt instanceof CreateTableAsSelectStmt) { handleCtasStmt(); } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass - try { - if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) { - profileType = ProfileType.LOAD; + InsertStmt insertStmt = (InsertStmt) parsedStmt; + if (insertStmt.needLoadManager()) { + // TODO(tsy): will eventually try to handle native insert and external insert together + // add a branch for external load + handleExternalInsertStmt(); + } else { + try { + if (!insertStmt.getQueryStmt().isExplain()) { + profileType = ProfileType.LOAD; + } + handleInsertStmt(); + } catch (Throwable t) { + LOG.warn("handle insert stmt fail: {}", t.getMessage()); + // the transaction of this insert may already begin, we will abort it at outer finally block. + throw t; } - handleInsertStmt(); - } catch (Throwable t) { - LOG.warn("handle insert stmt fail: {}", t.getMessage()); - // the transaction of this insert may already begin, we will abort it at outer finally block. - throw t; } } else if (parsedStmt instanceof LoadStmt) { handleLoadStmt(); @@ -698,7 +720,8 @@ public class StmtExecutor { InsertStmt insertStmt = (InsertStmt) parsedStmt; // The transaction of an insert operation begin at analyze phase. // So we should abort the transaction at this finally block if it encounters exception. - if (insertStmt.isTransactionBegin() && context.getState().getStateType() == MysqlStateType.ERR) { + if (!insertStmt.needLoadManager() && insertStmt.isTransactionBegin() + && context.getState().getStateType() == MysqlStateType.ERR) { try { String errMsg = Strings.emptyToNull(context.getState().getErrorMessage()); Env.getCurrentGlobalTransactionMgr().abortTransaction( @@ -827,7 +850,7 @@ public class StmtExecutor { } if (parsedStmt instanceof QueryStmt - || parsedStmt instanceof InsertStmt + || (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager()) || parsedStmt instanceof CreateTableAsSelectStmt) { if (Config.enable_resource_group && context.sessionVariable.enablePipelineEngine()) { analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr() @@ -1483,9 +1506,10 @@ public class StmtExecutor { private int executeForTxn(InsertStmt insertStmt) throws UserException, TException, InterruptedException, ExecutionException, TimeoutException { if (context.isTxnIniting()) { // first time, begin txn - beginTxn(insertStmt.getDb(), insertStmt.getTbl()); + beginTxn(insertStmt.getDbName(), + insertStmt.getTbl()); } - if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDb()) + if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDbName()) || !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) { throw new TException("Only one table can be inserted in one transaction."); } @@ -1585,8 +1609,8 @@ public class StmtExecutor { if (context.getMysqlChannel() != null) { context.getMysqlChannel().reset(); } - // create plan InsertStmt insertStmt = (InsertStmt) parsedStmt; + // create plan if (insertStmt.getQueryStmt().hasOutFileClause()) { throw new DdlException("Not support OUTFILE clause in INSERT statement"); } @@ -1728,7 +1752,8 @@ public class StmtExecutor { txnId = insertStmt.getTransactionId(); try { context.getEnv().getLoadManager() - .recordFinishedLoadJob(label, txnId, insertStmt.getDb(), insertStmt.getTargetTable().getId(), + .recordFinishedLoadJob(label, txnId, insertStmt.getDbName(), + insertStmt.getTargetTable().getId(), EtlJobType.INSERT, createTime, throwable == null ? "" : throwable.getMessage(), coord.getTrackingUrl(), insertStmt.getUserInfo()); } catch (MetaNotFoundException e) { @@ -1754,12 +1779,34 @@ public class StmtExecutor { // set insert result in connection context, // so that user can use `show insert result` to get info of the last insert operation. - context.setOrUpdateInsertResult(txnId, label, insertStmt.getDb(), insertStmt.getTbl(), + context.setOrUpdateInsertResult(txnId, label, insertStmt.getDbName(), insertStmt.getTbl(), txnStatus, loadedRows, filteredRows); // update it, so that user can get loaded rows in fe.audit.log context.updateReturnRows((int) loadedRows); } + private void handleExternalInsertStmt() { + // TODO(tsy): load refactor, handle external load here + try { + InsertStmt insertStmt = (InsertStmt) parsedStmt; + LoadType loadType = insertStmt.getLoadType(); + if (loadType == LoadType.UNKNOWN) { + throw new DdlException("Unknown load job type"); + } + LoadManagerAdapter loadManagerAdapter = context.getEnv().getLoadManagerAdapter(); + loadManagerAdapter.startLoadFromInsertStmt(insertStmt); + context.getState().setOk(); + } catch (UserException e) { + // Return message to info client what happened. + LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + } catch (Exception e) { + // Maybe our bug + LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + } + } + private void handleUnsupportedStmt() { context.getMysqlChannel().reset(); // do nothing diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java index 0b19e54f51..3101db6937 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java @@ -148,16 +148,20 @@ public class InsertStmtTest { } - @Injectable InsertTarget target; - @Injectable InsertSource source; - @Injectable Table targetTable; + @Injectable + InsertTarget target; + @Injectable + InsertSource source; + @Injectable + Table targetTable; @Test public void testNormal() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); String sql = "values(1,'a',2,'b')"; - SqlScanner input = new SqlScanner(new StringReader(sql), ctx.getSessionVariable().getSqlMode()); + SqlScanner input = new SqlScanner(new StringReader(sql), + ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); StatementBase statementBase = null; @@ -188,7 +192,7 @@ public class InsertStmtTest { }; - InsertStmt stmt = new InsertStmt(target, "label", null, source, new ArrayList<>()); + InsertStmt stmt = new NativeInsertStmt(target, "label", null, source, new ArrayList<>()); stmt.setTargetTable(targetTable); stmt.setQueryStmt(queryStmt); @@ -221,7 +225,8 @@ public class InsertStmtTest { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); String sql = "select kk1, kk2, kk3, kk4 from db.tbl"; - SqlScanner input = new SqlScanner(new StringReader(sql), ctx.getSessionVariable().getSqlMode()); + SqlScanner input = new SqlScanner(new StringReader(sql), + ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); StatementBase statementBase = null; @@ -252,7 +257,7 @@ public class InsertStmtTest { }; - InsertStmt stmt = new InsertStmt(target, "label", null, source, new ArrayList<>()); + InsertStmt stmt = new NativeInsertStmt(target, "label", null, source, new ArrayList<>()); stmt.setTargetTable(targetTable); stmt.setQueryStmt(queryStmt); diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy b/regression-test/suites/load_p0/broker_load/test_array_load.groovy index 2697d08db2..5d7e04d883 100644 --- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy @@ -271,6 +271,23 @@ suite("test_array_load", "load_p0") { } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // test unified load + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ set enable_unified_load=true; """ + + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs.call(testTable, test_load_label, hdfs_json_file_path, "json", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + sql """ set enable_unified_load=false; """ + } // case7: import array data by hdfs in csv format and enable vectorized try { @@ -287,6 +304,23 @@ suite("test_array_load", "load_p0") { } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // test unified load + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ set enable_unified_load=true; """ + + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs1.call(testTable, test_load_label, hdfs_csv_file_path, "csv", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + sql """ set enable_unified_load=false; """ + } // case9: import array data by hdfs in orc format and enable vectorized try { @@ -303,6 +337,23 @@ suite("test_array_load", "load_p0") { } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // test unified load + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ set enable_unified_load=true; """ + + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs1.call(testTable, test_load_label, hdfs_orc_file_path, "orc", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + sql """ set enable_unified_load=false; """ + } // case11: import array data by hdfs in parquet format and enable vectorized try { @@ -319,6 +370,23 @@ suite("test_array_load", "load_p0") { } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // test unified load + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ set enable_unified_load=true; """ + + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs1.call(testTable, test_load_label, hdfs_parquet_file_path, "parquet", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + sql """ set enable_unified_load=false; """ + } // case13: import array data by hdfs in orc format(with array type) and enable vectorized try { @@ -335,5 +403,22 @@ suite("test_array_load", "load_p0") { } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // test unified load + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ set enable_unified_load=true; """ + + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs1.call(testTable, test_load_label, hdfs_orc_file_path2, "orc", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + sql """ set enable_unified_load=false; """ + } } } diff --git a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy index 15a28dc827..5d2b68c415 100644 --- a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy +++ b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy @@ -315,5 +315,68 @@ suite("test_broker_load_p2", "p2") { } } } + + // test unified load + if (enabled != null && enabled.equalsIgnoreCase("true")) { + sql """ set enable_unified_load=true; """ + def uuids = [] + try { + def i = 0 + for (String table in tables) { + sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text + + def uuid = UUID.randomUUID().toString().replace("-", "0") + uuids.add(uuid) + do_load_job.call(uuid, paths[i], table, columns_list[i], column_in_paths[i], preceding_filters[i], + set_values[i], where_exprs[i]) + i++ + } + + i = 0 + for (String label in uuids) { + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + label) + assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5] + ", label: $label") + break; + } + if (result[0][2].equals("CANCELLED")) { + assertTrue(result[0][7].contains(error_msg[i])) + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $label") + } + } + i++ + } + + def orc_expect_result = """[[20, 15901, 6025915247311731176, 1373910657, 8863282788606566657], [38, 15901, -9154375582268094750, 1373853561, 4923892366467329038], [38, 15901, -9154375582268094750, 1373853561, 8447995939656287502], [38, 15901, -9154375582268094750, 1373853565, 7451966001310881759], [38, 15901, -9154375582268094750, 1373853565, 7746521994248163870], [38, 15901, -9154375582268094750, 1373853577, 6795654975682437824], [38, 15901, -9154375582268094750, 1373853577, 9009208035649338594], [38, 15901, -9154375582268094750, 1373853608, 6374361939566017108], [38, 15901, -9154375582268094750, 1373853608, 7387298457456465364], [38, 15901, -9154375582268094750, 1373853616, 7463736180224933002]]""" + for (String table in tables) { + if (table.matches("orc_s3_case[23456789]")) { + String[][] orc_actual_result = sql """select CounterID, EventDate, UserID, EventTime, WatchID from $table order by CounterID, EventDate, UserID, EventTime, WatchID limit 10;""" + assertTrue("$orc_actual_result" == "$orc_expect_result") + } + } + + order_qt_parquet_s3_case1 """select count(*) from parquet_s3_case1 where col1=10""" + order_qt_parquet_s3_case3 """select count(*) from parquet_s3_case3 where p_partkey < 100000""" + order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 where p_partkey < 100000""" + order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 where col4=4""" + order_qt_parquet_s3_case8 """ select count(*) from parquet_s3_case8 where p_partkey=1""" + order_qt_parquet_s3_case9 """ select * from parquet_s3_case9""" + + } finally { + for (String table in tables) { + sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text + } + sql """ set enable_unified_load=false; """ + } + } }