[refactor](insert) remove unused insert code in FE #29924

This commit is contained in:
Mingyu Chen
2024-01-16 22:41:18 +08:00
committed by yiguolei
parent f0a4ec5f85
commit 3f2a794c2e
13 changed files with 4 additions and 1048 deletions

View File

@ -1,125 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Map;
public class BrokerLoadStmt extends InsertStmt {
private final List<DataDescription> dataDescList;
private final BrokerDesc brokerDesc;
private String cluster;
public BrokerLoadStmt(LabelName label, List<DataDescription> dataDescList, BrokerDesc brokerDesc,
Map<String, String> properties, String comments) {
super(label, properties, comments);
this.dataDescList = dataDescList;
this.brokerDesc = brokerDesc;
}
@Override
public List<DataDescription> getDataDescList() {
return dataDescList;
}
@Override
public BrokerDesc getResourceDesc() {
return brokerDesc;
}
@Override
public LoadType getLoadType() {
return LoadType.BROKER_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
// public check should be in base class
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
label.analyze(analyzer);
Preconditions.checkState(!CollectionUtils.isEmpty(dataDescList),
new AnalysisException("No data file in load statement."));
Preconditions.checkNotNull(brokerDesc, "No broker desc found.");
// check data descriptions
for (DataDescription dataDescription : dataDescList) {
final String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
Preconditions.checkState(!dataDescription.isLoadFromTable(),
new AnalysisException("Load from table should use Spark Load"));
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
if (!brokerDesc.isMultiLoadBroker()) {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType(), "DATA INFILE must be specified.");
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
}
}
}
}
@Override
public boolean needAuditEncryption() {
return true;
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("LOAD LABEL ").append(label.toSql()).append("\n");
sb.append("(");
Joiner.on(",\n").appendTo(sb, Lists.transform(dataDescList, DataDesc::toSql)).append(")");
if (cluster != null) {
sb.append("\nBY '");
sb.append(cluster);
sb.append("'");
}
if (brokerDesc != null) {
sb.append("\n").append(brokerDesc.toSql());
}
if (properties != null && !properties.isEmpty()) {
sb.append("\nPROPERTIES (");
sb.append(new PrintableMap<>(properties, "=", true, false, true));
sb.append(")");
}
return sb.toString();
}
}

View File

@ -1,100 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java
// and modified by Doris
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MysqlLoadStmt extends InsertStmt {
private final DataDescription dataDescription;
public MysqlLoadStmt(DataDescription dataDescription, Map<String, String> properties, String comments) {
super(new LabelName(), properties, comments);
this.dataDescription = dataDescription;
}
@Override
public List<? extends DataDesc> getDataDescList() {
return Collections.singletonList(dataDescription);
}
@Override
public ResourceDesc getResourceDesc() {
// mysql load does not have resource desc
return null;
}
@Override
public LoadType getLoadType() {
return LoadType.MYSQL_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
if (!dataDescription.isClientLocal()) {
for (String path : dataDescription.getFilePaths()) {
if (Config.mysql_load_server_secure_path.isEmpty()) {
throw new AnalysisException("Load local data from fe local is not enabled. If you want to use it,"
+ " plz set the `mysql_load_server_secure_path` for FE to be a right path.");
} else {
File file = new File(path);
try {
if (!(file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path))) {
throw new AnalysisException("Local file should be under the secure path of FE.");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!file.exists()) {
throw new AnalysisException("File: " + path + " is not exists.");
}
}
}
}
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.NO_FORWARD;
}
}

View File

@ -187,6 +187,7 @@ public class NativeInsertStmt extends InsertStmt {
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
// Ctor of group commit in sql parser
public NativeInsertStmt(long tableId, String label, List<String> cols, InsertSource source,
List<String> hints) {
this(new InsertTarget(new TableName(null, null, null), null), label, cols, source, hints);

View File

@ -1,437 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.constants.S3Properties.Env;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* S3 Load based on S3 TVF
*/
public class S3TvfLoadStmt extends NativeInsertStmt {
private static final Logger LOG = LogManager.getLogger(S3TvfLoadStmt.class);
private static final String FORMAT_CSV = "csv";
private static final String DEFAULT_FORMAT = FORMAT_CSV;
private final DataDescription dataDescription;
/**
* for csv format, we need some particular process
*/
private final boolean isCsvFormat;
/**
* only used for loading from csv format tvf
* with mapping from col name to csv-format-tvf-style col name (c1, c2, c3...)
*/
private Map<String, String> selectColNameToCsvColName;
@VisibleForTesting
private Set<String> functionGenTableColNames;
public S3TvfLoadStmt(LabelName label, List<DataDescription> dataDescList, BrokerDesc brokerDesc,
Map<String, String> properties, String comments) throws DdlException {
super(buildInsertTarget(dataDescList.get(0)),
label.getLabelName(), /*insert all columns by default*/null,
buildInsertSource(dataDescList.get(0), brokerDesc), null);
this.label = label;
this.dataDescription = dataDescList.get(0);
this.properties = properties;
this.comments = comments;
this.isCsvFormat = isCsvFormat(dataDescription.getFileFormat());
}
// ------------------------------------ init helpers ------------------------------------
private static InsertTarget buildInsertTarget(DataDescription dataDescription) {
final TableName tableName = new TableName(null, null, dataDescription.getTableName());
return new InsertTarget(tableName, dataDescription.getPartitionNames());
}
private static InsertSource buildInsertSource(DataDescription dataDescription, BrokerDesc brokerDesc)
throws DdlException {
final SelectList selectList = new SelectList();
// use `select *` by default
final SelectListItem item = new SelectListItem(SelectListItem.createStarItem(null));
selectList.addItem(item);
// build from
final FromClause fromClause = new FromClause(
Collections.singletonList(buildTvfRef(dataDescription, brokerDesc))
);
// trans order by in load stmt
final String sequenceCol = dataDescription.getSequenceCol();
final ArrayList<OrderByElement> orderByElementList = Lists.newArrayList();
if (!Strings.isNullOrEmpty(sequenceCol)) {
final OrderByElement orderByElement = new OrderByElement(
new SlotRef(null, sequenceCol),
true, null
);
orderByElementList.add(orderByElement);
}
// merge preceding filter and where expr
final BoolLiteral trueLiteral = new BoolLiteral(true);
final Expr whereExpr = Optional.ofNullable(dataDescription.getWhereExpr()).orElse(trueLiteral);
final Expr precdingFilterExpr =
Optional.ofNullable(dataDescription.getPrecdingFilterExpr()).orElse(trueLiteral);
final Expr compoundPredicate = new CompoundPredicate(Operator.AND, precdingFilterExpr, whereExpr);
final SelectStmt selectStmt = new SelectStmt(
selectList, fromClause, compoundPredicate,
null, null,
orderByElementList, LimitElement.NO_LIMIT
);
return new InsertSource(selectStmt);
}
private static TableRef buildTvfRef(DataDescription dataDescription, BrokerDesc brokerDesc) throws DdlException {
final Map<String, String> params = Maps.newHashMap();
final List<String> filePaths = dataDescription.getFilePaths();
Preconditions.checkState(filePaths.size() == 1, "there should be only one file path");
final String s3FilePath = filePaths.get(0);
params.put(S3TableValuedFunction.PROP_URI, s3FilePath);
final Map<String, String> dataDescProp = dataDescription.getProperties();
if (dataDescProp != null) {
params.putAll(dataDescProp);
}
final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
params.put(FileFormatConstants.PROP_FORMAT, format);
if (isCsvFormat(format)) {
parseSeparator(dataDescription.getColumnSeparatorObj(), params);
parseSeparator(dataDescription.getLineDelimiterObj(), params);
}
List<String> columnsFromPath = dataDescription.getColumnsFromPath();
if (columnsFromPath != null) {
params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS,
String.join(",", columnsFromPath));
}
Preconditions.checkState(!brokerDesc.isMultiLoadBroker(), "do not support multi broker load currently");
Preconditions.checkState(brokerDesc.getStorageType() == StorageType.S3, "only support S3 load");
final Map<String, String> s3ResourceProp = brokerDesc.getProperties();
S3Properties.convertToStdProperties(s3ResourceProp);
s3ResourceProp.keySet().removeIf(Env.FS_KEYS::contains);
params.putAll(s3ResourceProp);
try {
return new TableValuedFunctionRef(S3TableValuedFunction.NAME, null, params);
} catch (AnalysisException e) {
throw new DdlException("failed to create s3 tvf ref", e);
}
}
private static void parseSeparator(Separator separator, Map<String, String> tvfParams) throws DdlException {
if (separator == null) {
return;
}
try {
separator.analyze();
} catch (AnalysisException e) {
throw new DdlException(String.format("failed to parse separator:%s", separator), e);
}
tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, separator.getSeparator());
}
private static boolean isCsvFormat(String format) {
return Strings.isNullOrEmpty(format) || StringUtils.equalsIgnoreCase(format, FORMAT_CSV);
}
// --------------------------------------------------------------------------------------
@Override
public void convertSemantic(Analyzer analyzer) throws UserException {
label.analyze(analyzer);
initTargetTable(analyzer);
analyzeColumns(analyzer);
}
@Override
public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet)
throws AnalysisException {
super.getTables(analyzer, tableMap, parentViewNameSet);
final List<TableIf> tables = Lists.newArrayList(tableMap.values());
Preconditions.checkState(tables.size() == 2,
"table map should only contain the unique function generated table and the unique target table");
functionGenTableColNames = tables.get(0)
.getBaseSchema()
.stream()
.map(Column::getName)
.collect(Collectors.toSet());
}
// ------------------------------------ columns mapping ------------------------------------
private void analyzeColumns(Analyzer analyzer) throws AnalysisException {
final String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
// copy a list for analyzing
List<ImportColumnDesc> columnExprList = Lists.newArrayList(dataDescription.getParsedColumnExprList());
if (CollectionUtils.isEmpty(columnExprList)) {
padExprListIfNeeded(columnExprList);
}
rewriteExpr(columnExprList);
boolean isFileFieldSpecified = columnExprList.stream().anyMatch(ImportColumnDesc::isColumn);
if (!isFileFieldSpecified) {
return;
}
if (isCsvFormat) {
// in tvf, csv format column names are like "c1, c2, c3", record for correctness of select list
recordCsvColNames(columnExprList);
}
columnExprList = filterColumns(columnExprList);
if (CollectionUtils.isEmpty(columnExprList)) {
return;
}
resetTargetColumnNames(columnExprList);
resetSelectList(columnExprList);
}
/**
* deal with the case that not all columns in table are in file
*/
private void padExprListIfNeeded(List<ImportColumnDesc> columnExprList) {
if (isCsvFormat) {
return;
}
columnExprList.addAll(
functionGenTableColNames
.stream()
.map(ImportColumnDesc::new)
.collect(Collectors.toList())
);
}
/**
* find and rewrite the derivative columns
* e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1)
*/
private void rewriteExpr(List<ImportColumnDesc> columnDescList) {
Preconditions.checkNotNull(columnDescList, "columns should be not null");
Preconditions.checkNotNull(targetTable, "target table is unset");
LOG.debug("original columnExpr:{}", columnDescList);
Map<String, Expr> derivativeColumns = Maps.newHashMap();
columnDescList
.stream()
.filter(Predicates.not(ImportColumnDesc::isColumn))
.forEach(desc -> {
final Expr expr = desc.getExpr();
if (expr instanceof SlotRef) {
final String columnName = ((SlotRef) expr).getColumnName();
if (derivativeColumns.containsKey(columnName)) {
desc.setExpr(derivativeColumns.get(columnName));
}
} else {
recursiveRewrite(expr, derivativeColumns);
}
derivativeColumns.put(desc.getColumnName(), expr);
});
// `tmp` columns with expr can be removed after expr rewritten
columnDescList.removeIf(
Predicates.not(columnDesc ->
columnDesc.isColumn() || Objects.nonNull(targetTable.getColumn(columnDesc.getColumnName()))
)
);
LOG.debug("rewrite result:{}", columnDescList);
}
private void recursiveRewrite(Expr expr, Map<String, Expr> derivativeColumns) {
final ArrayList<Expr> children = expr.getChildren();
if (CollectionUtils.isEmpty(children)) {
return;
}
for (int i = 0; i < children.size(); i++) {
Expr child = expr.getChild(i);
if (child instanceof SlotRef) {
final String columnName = ((SlotRef) child).getColumnName();
if (derivativeColumns.containsKey(columnName)) {
expr.setChild(i, derivativeColumns.get(columnName));
}
continue;
}
recursiveRewrite(child, derivativeColumns);
}
}
/**
* record mapping from col name to csv-format-tvf-style col name
*
* @see selectColNameToCsvColName
*/
private void recordCsvColNames(List<ImportColumnDesc> columnDescList) {
AtomicInteger counter = new AtomicInteger(1);
selectColNameToCsvColName = columnDescList.stream()
.filter(ImportColumnDesc::isColumn)
.collect(Collectors.toMap(
ImportColumnDesc::getColumnName,
name -> "c" + counter.getAndIncrement(),
(v1, v2) -> v1,
LinkedHashMap::new
));
LOG.debug("select column name to csv colum name:{}", selectColNameToCsvColName);
}
private List<ImportColumnDesc> filterColumns(List<ImportColumnDesc> columnExprList) {
Preconditions.checkNotNull(targetTable, "target table is unset");
// remove all `tmp` columns, which are not in target table
columnExprList.removeIf(
Predicates.and(
ImportColumnDesc::isColumn,
desc -> Objects.isNull(targetTable.getColumn(desc.getColumnName())),
desc -> functionGenTableColNames.contains(desc.getColumnName())
)
);
// deal with the case like:
// (k1, k2) SET(k1 = `upper(k1)`)
columnExprList = Lists.newArrayList(columnExprList.stream()
.collect(Collectors.toMap(
ImportColumnDesc::getColumnName,
Function.identity(),
(lhs, rhs) -> {
if (lhs.getExpr() != null && rhs.getExpr() == null) {
return lhs;
}
if (lhs.getExpr() == null && rhs.getExpr() != null) {
return rhs;
}
throw new IllegalArgumentException(
String.format("column `%s` specified twice", lhs.getColumnName()));
}
)).values());
// deal with the case that column in target table but not in tvf
columnExprList.removeIf(desc ->
!functionGenTableColNames.contains(desc.getColumnName())
&& Objects.nonNull(targetTable.getColumn(desc.getColumnName()))
&& desc.isColumn()
);
LOG.debug("filtered result:{}", columnExprList);
return columnExprList;
}
private void resetTargetColumnNames(List<ImportColumnDesc> columnExprList) {
targetColumnNames = columnExprList
.stream()
.map(ImportColumnDesc::getColumnName)
.collect(Collectors.toList());
LOG.debug("target cols:{}", targetColumnNames);
}
private void resetSelectList(List<ImportColumnDesc> columnExprList) {
if (isCsvFormat) {
rewriteExprColNameToCsvStyle(columnExprList);
}
LOG.debug("select list:{}", columnExprList);
final SelectList selectList = new SelectList();
columnExprList.forEach(desc -> {
if (!desc.isColumn()) {
selectList.addItem(new SelectListItem(desc.getExpr(), desc.getColumnName()));
return;
}
if (isCsvFormat) {
// use csv-style-column name and target column name as alias
final Expr slotRef = new SlotRef(null, selectColNameToCsvColName.get(desc.getColumnName()));
selectList.addItem(new SelectListItem(slotRef, desc.getColumnName()));
} else {
selectList.addItem(new SelectListItem(new SlotRef(null, desc.getColumnName()), null));
}
});
((SelectStmt) getQueryStmt()).resetSelectList(selectList);
}
private void rewriteExprColNameToCsvStyle(List<ImportColumnDesc> columnExprList) {
Preconditions.checkNotNull(selectColNameToCsvColName,
"SelectColName To CsvColName is not recorded");
columnExprList
.stream()
.filter(Predicates.not(ImportColumnDesc::isColumn))
.forEach(desc -> rewriteSlotRefInExpr(desc.getExpr()));
// rewrite where predicate and order by elements
final SelectStmt selectStmt = (SelectStmt) getQueryStmt();
rewriteSlotRefInExpr(selectStmt.getWhereClause());
if (selectStmt.getOrderByElements() != null) {
selectStmt.getOrderByElements().forEach(orderByElement -> rewriteSlotRefInExpr(orderByElement.getExpr()));
}
}
private void rewriteSlotRefInExpr(Expr expr) {
final Predicate<? super Expr> rewritePredicate = e -> e instanceof SlotRef
&& selectColNameToCsvColName.containsKey(((SlotRef) e).getColumnName());
final Consumer<? super Expr> rewriteOperation = e -> {
final SlotRef slotRef = (SlotRef) e;
slotRef.setCol(selectColNameToCsvColName.get(slotRef.getColumnName()));
};
List<Expr> slotRefs = Lists.newArrayList();
expr.collect(rewritePredicate, slotRefs);
slotRefs.forEach(rewriteOperation);
}
// -----------------------------------------------------------------------------------------
}

View File

@ -1,99 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StatementBase.java
// and modified by Doris
package org.apache.doris.analysis;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class SparkLoadStmt extends InsertStmt {
private final DataDescription dataDescription;
private final ResourceDesc resourceDesc;
public SparkLoadStmt(LabelName label, List<DataDescription> dataDescList, ResourceDesc resourceDesc,
Map<String, String> properties, String comments) {
super(label, properties, comments);
Preconditions.checkState(dataDescList.size() == 1,
"spark load could only have one desc");
this.dataDescription = dataDescList.get(0);
this.resourceDesc = resourceDesc;
}
@Override
public List<? extends DataDesc> getDataDescList() {
return Collections.singletonList(dataDescription);
}
@Override
public ResourceDesc getResourceDesc() {
return resourceDesc;
}
@Override
public LoadType getLoadType() {
return LoadType.SPARK_LOAD;
}
@Override
public void analyzeProperties() throws DdlException {
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
label.analyze(analyzer);
Preconditions.checkNotNull(dataDescription, new AnalysisException("No data file in load statement."));
Preconditions.checkNotNull(resourceDesc, new AnalysisException("Resource desc not found"));
String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
resourceDesc.analyze();
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
dataDescription.checkKeyTypeForLoad(table);
// check resource usage privilege
if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(),
resourceDesc.getName(),
PrivPredicate.USAGE)) {
throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ "'@'" + ConnectContext.get().getRemoteIP()
+ "' for resource '" + resourceDesc.getName() + "'");
}
}
@Override
public String toSql() {
return super.toSql();
}
}

View File

@ -17,9 +17,7 @@
package org.apache.doris.analysis;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.DdlException;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
@ -46,25 +44,12 @@ public class UnifiedLoadStmt extends DdlStmt {
public static UnifiedLoadStmt buildMysqlLoadStmt(DataDescription dataDescription, Map<String, String> properties,
String comment) {
final ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) {
return new UnifiedLoadStmt(new MysqlLoadStmt(dataDescription, properties, comment));
}
return new UnifiedLoadStmt(new LoadStmt(dataDescription, properties, comment));
}
public static UnifiedLoadStmt buildBrokerLoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc,
Map<String, String> properties, String comment) throws DdlException {
final ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) {
if (brokerDesc != null && brokerDesc.getStorageType() == StorageType.S3) {
// for tvf solution validation
return new UnifiedLoadStmt(new S3TvfLoadStmt(label, dataDescriptions, brokerDesc, properties, comment));
}
return new UnifiedLoadStmt(new BrokerLoadStmt(label, dataDescriptions, brokerDesc, properties, comment));
}
return new UnifiedLoadStmt(new LoadStmt(label, dataDescriptions, brokerDesc, properties, comment));
}

View File

@ -1319,12 +1319,6 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
public long fileSplitSize = 0;
/**
* determine should we enable unified load (use insert stmt as the backend for all load)
*/
@VariableMgr.VarAttr(name = ENABLE_UNIFIED_LOAD, needForward = true)
public boolean enableUnifiedLoad = false;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
@ -3158,10 +3152,6 @@ public class SessionVariable implements Serializable, Writable {
return num;
}
public boolean isEnableUnifiedLoad() {
return enableUnifiedLoad;
}
public boolean getEnablePipelineEngine() {
return enablePipelineEngine || enablePipelineXEngine;
}