[fix](Export) Concatenation the outfile sql for Export (#23635)

In the original logic, the `Export` statement generates `Selectstmt` for execution. But there is no way to make the `SelectStmt` use the new optimizer.

Now, we change the `Export` statement to generate the `outfile SQL`, and then use the new optimizer to parse the SQL so that outfile can use the new optimizer.
This commit is contained in:
Tiewei Fang
2023-09-08 10:20:18 +08:00
committed by GitHub
parent cdb1b341c7
commit a27349c83a
36 changed files with 3214 additions and 511 deletions

View File

@ -366,7 +366,5 @@ WITH BROKER "broker_name"
- If the Export job fails, the generated files will not be deleted, and the user needs to delete it manually.
- The Export job only exports the data of the Base table, not the data of the materialized view.
- The export job scans data and occupies IO resources, which may affect the query latency of the system.
- The maximum number of export jobs running simultaneously in a cluster is 5. Only jobs submitted after that will be queued.
- Currently, The `Export Job` is simply check whether the `Tablets version` is the same, it is recommended not to import data during the execution of the `Export Job`.
- The maximum parallelism of all `Export jobs` in a cluster is `50`. You can change the value by adding the parameter `maximum_parallelism_of_export_job` to fe.conf and restart FE.
- The maximum number of partitions that an `Export job` allows is 2000. You can add a parameter to the fe.conf `maximum_number_of_export_partitions` and restart FE to modify the setting.

View File

@ -359,7 +359,5 @@ WITH BROKER "broker_name"
- 如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。
- Export 作业只会导出 Base 表的数据,不会导出物化视图的数据。
- Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。
- 一个集群内同时运行的 Export 作业最大个数为 5。之后提交的作业将会排队。
- 目前在export时只是简单检查tablets版本是否一致,建议在执行export过程中不要对该表进行导入数据操作。
- 一个集群内所有Export Job的parallelism加起来最多是50。可以在fe.conf中添加参数`maximum_parallelism_of_export_job`并重启FE来修改该设置。
- 一个Export Job允许导出的分区数量最大为2000,可以在fe.conf中添加参数`maximum_number_of_export_partitions`并重启FE来修改该设置

View File

@ -56,7 +56,7 @@ statement
| EXPORT TABLE tableName=multipartIdentifier
(PARTITION partition=identifierList)?
(whereClause)?
TO filePath=constant
TO filePath=STRING_LITERAL
(propertyClause)?
(withRemoteStorageSystem)? #export
;

View File

@ -66,9 +66,9 @@ public class BrokerDesc extends StorageDesc implements Writable {
public BrokerDesc(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
this.properties = Maps.newHashMap();
if (properties != null) {
this.properties.putAll(properties);
}
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
@ -84,9 +84,9 @@ public class BrokerDesc extends StorageDesc implements Writable {
public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
this.properties = Maps.newHashMap();
if (properties != null) {
this.properties.putAll(properties);
}
this.storageType = storageType;
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));

View File

@ -87,8 +87,9 @@ public class BrokerLoadStmt extends InsertStmt {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
dataDescription.getFilePaths().set(i,
ExportStmt.checkPath(dataDescription.getFilePaths().get(i), brokerDesc.getStorageType()));
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType());
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
}
}
}

View File

@ -31,7 +31,6 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.URI;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -42,7 +41,7 @@ import org.apache.doris.qe.VariableMgr;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.Getter;
@ -86,7 +85,6 @@ public class ExportStmt extends StatementBase {
private TableName tblName;
private List<String> partitionStringNames;
private Expr whereExpr;
private String whereSql;
private String path;
private BrokerDesc brokerDesc;
private Map<String, String> properties = Maps.newHashMap();
@ -129,15 +127,6 @@ public class ExportStmt extends StatementBase {
this.sessionVariables = optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable());
}
/**
* This constructor used by nereids planner
*/
public ExportStmt(TableRef tableRef, String whereSql, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this(tableRef, (Expr) null, path, properties, brokerDesc);
this.whereSql = whereSql;
}
@Override
public boolean needAuditEncryption() {
return brokerDesc != null;
@ -162,6 +151,8 @@ public class ExportStmt extends StatementBase {
throw new AnalysisException("Do not support exporting temporary partitions");
}
partitionStringNames = optionalPartitionNames.get().getPartitionNames();
} else {
partitionStringNames = ImmutableList.of();
}
// check auth
@ -185,7 +176,7 @@ public class ExportStmt extends StatementBase {
}
// check path is valid
path = checkPath(path, brokerDesc.getStorageType());
StorageBackend.checkPath(path, brokerDesc.getStorageType());
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr();
if (!brokerMgr.containsBroker(brokerDesc.getName())) {
@ -201,7 +192,7 @@ public class ExportStmt extends StatementBase {
// create job and analyze job
setJob();
exportJob.analyze();
exportJob.generateOutfileStatement();
}
private void setJob() throws UserException {
@ -219,7 +210,6 @@ public class ExportStmt extends StatementBase {
// set where expr
exportJob.setWhereExpr(this.whereExpr);
exportJob.setWhereSql(this.whereSql);
// set path
exportJob.setExportPath(this.path);
@ -248,13 +238,12 @@ public class ExportStmt extends StatementBase {
exportJob.setSessionVariables(this.sessionVariables);
exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
exportJob.setSql(this.toSql());
exportJob.setOrigStmt(this.getOrigStmt());
}
// check partitions specified by user are belonged to the table.
private void checkPartitions(Env env) throws AnalysisException {
if (partitionStringNames == null) {
if (partitionStringNames.isEmpty()) {
return;
}
@ -299,43 +288,6 @@ public class ExportStmt extends StatementBase {
}
}
public static String checkPath(String path, StorageBackend.StorageType type) throws AnalysisException {
if (Strings.isNullOrEmpty(path)) {
throw new AnalysisException("No destination path specified.");
}
URI uri = URI.create(path);
String schema = uri.getScheme();
if (schema == null) {
throw new AnalysisException(
"Invalid export path, there is no schema of URI found. please check your path.");
}
if (type == StorageBackend.StorageType.BROKER) {
if (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs")
&& !schema.equalsIgnoreCase("gs")) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://' or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 's3://' path.");
} else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path.");
} else if (type == StorageBackend.StorageType.LOCAL && !schema.equalsIgnoreCase("file")) {
throw new AnalysisException(
"Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
}
return path;
}
private void checkProperties(Map<String, String> properties) throws UserException {
for (String key : properties.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {

View File

@ -429,8 +429,9 @@ public class LoadStmt extends DdlStmt {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
dataDescription.getFilePaths().set(i,
ExportStmt.checkPath(dataDescription.getFilePaths().get(i), brokerDesc.getStorageType()));
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType());
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.URI;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.thrift.TStorageBackendType;
@ -34,6 +35,42 @@ public class StorageBackend implements ParseNode {
private String location;
private StorageDesc storageDesc;
public static void checkPath(String path, StorageBackend.StorageType type) throws AnalysisException {
if (Strings.isNullOrEmpty(path)) {
throw new AnalysisException("No destination path specified.");
}
URI uri = URI.create(path);
String schema = uri.getScheme();
if (schema == null) {
throw new AnalysisException(
"Invalid export path, there is no schema of URI found. please check your path.");
}
if (type == StorageBackend.StorageType.BROKER) {
if (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs")
&& !schema.equalsIgnoreCase("gs")) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://' or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 's3://' path.");
} else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path.");
} else if (type == StorageBackend.StorageType.LOCAL && !schema.equalsIgnoreCase("file")) {
throw new AnalysisException(
"Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
}
}
public StorageBackend(String storageName, String location,
StorageType storageType, Map<String, String> properties) {
this.storageDesc = new StorageDesc(storageName, storageType, properties);
@ -78,7 +115,7 @@ public class StorageBackend implements ParseNode {
if (Strings.isNullOrEmpty(location)) {
throw new AnalysisException("You must specify a location on the repository");
}
location = ExportStmt.checkPath(location, storageType);
checkPath(location, storageType);
}
@Override

View File

@ -219,6 +219,7 @@ import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.manager.JobTaskManager;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.scheduler.registry.PersistentJobRegister;
import org.apache.doris.scheduler.registry.TimerJobRegister;
import org.apache.doris.service.ExecuteEnv;
@ -338,6 +339,7 @@ public class Env {
private MetastoreEventsProcessor metastoreEventsProcessor;
private PersistentJobRegister persistentJobRegister;
private ExportTaskRegister exportTaskRegister;
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private JobTaskManager jobTaskManager;
@ -621,6 +623,7 @@ public class Env {
this.timerJobManager.setDisruptor(taskDisruptor);
this.transientTaskManager.setDisruptor(taskDisruptor);
this.persistentJobRegister = new TimerJobRegister(timerJobManager);
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
@ -1840,6 +1843,7 @@ public class Env {
long jobId = dis.readLong();
newChecksum ^= jobId;
ExportJob job = ExportJob.read(dis);
job.cancelReplayedExportJob();
if (!job.isExpired(curTime)) {
exportMgr.unprotectAddJob(job);
}
@ -3788,6 +3792,10 @@ public class Env {
return persistentJobRegister;
}
public ExportTaskRegister getExportTaskRegister() {
return exportTaskRegister;
}
public TimerJobManager getAsyncJobManager() {
return timerJobManager;
}

View File

@ -30,6 +30,7 @@ import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
@ -50,19 +51,33 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.scheduler.registry.TransientTaskRegister;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@ -81,6 +96,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Data
@ -91,9 +107,6 @@ public class ExportJob implements Writable {
private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = Config.maximum_tablets_of_outfile_in_export;
public static final TransientTaskRegister register = new ExportTaskRegister(
Env.getCurrentEnv().getTransientTaskManager());
@SerializedName("id")
private long id;
@SerializedName("label")
@ -156,11 +169,10 @@ public class ExportJob implements Writable {
private TableRef tableRef;
private Expr whereExpr;
private String whereSql;
private String sql = "";
private Optional<Expression> whereExpression;
private Integer parallelism;
private int parallelism;
public Map<String, Long> getPartitionToVersion() {
return partitionToVersion;
@ -172,7 +184,16 @@ public class ExportJob implements Writable {
// TODO(ftw): delete
private List<SelectStmt> selectStmtList = Lists.newArrayList();
private List<List<SelectStmt>> selectStmtListPerParallel = Lists.newArrayList();
/**
* Each parallel has an associated Outfile list
* which are organized into a two-dimensional list.
* Therefore, we can access the selectStmtListPerParallel
* to get the outfile logical plans list responsible for each parallel task.
*/
private List<List<StatementBase>> selectStmtListPerParallel = Lists.newArrayList();
private List<List<String>> outfileSqlPerParallel = Lists.newArrayList();
private List<StmtExecutor> stmtExecutorList;
@ -232,7 +253,7 @@ public class ExportJob implements Writable {
*
* @throws UserException
*/
public void analyze() throws UserException {
public void generateOutfileStatement() throws UserException {
exportTable.readLock();
try {
// generateQueryStmtOld
@ -243,9 +264,92 @@ public class ExportJob implements Writable {
generateExportJobExecutor();
}
public void generateExportJobExecutor() {
public void generateOutfileLogicalPlans(List<String> nameParts)
throws UserException {
exportTable.readLock();
try {
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
// get all tablets
List<List<Long>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
for (List<Long> tabletsList : tabletsListPerParallel) {
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i, end));
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(nameParts, tabletIds, selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
logicalPlanAdapters.add(statementBase);
}
selectStmtListPerParallel.add(logicalPlanAdapters);
}
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i);
for (StatementBase outfile : selectStmtListPerParallel.get(i)) {
LOG.debug("outfile sql: [{}]", outfile.toSql());
}
}
}
} finally {
exportTable.readUnlock();
}
generateExportJobExecutor();
}
private LogicalPlan generateOneLogicalPlan(List<String> nameParts, List<Long> tabletIds,
List<NamedExpression> selectLists) {
// UnboundRelation
LogicalPlan plan = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts,
this.partitionNames, false, tabletIds, ImmutableList.of());
// LogicalCheckPolicy
plan = new LogicalCheckPolicy<>(plan);
// LogicalFilter
if (this.whereExpression.isPresent()) {
plan = new LogicalFilter<>(ExpressionUtils.extractConjunctionToSet(this.whereExpression.get()), plan);
}
// LogicalFilter
plan = new LogicalProject(selectLists, plan);
// LogicalFileSink
plan = new LogicalFileSink<>(this.exportPath, this.format, convertOutfileProperties(),
ImmutableList.of(), plan);
return plan;
}
private StatementBase generateLogicalPlanAdapter(LogicalPlan outfileLogicalPlan) {
StatementContext statementContext = new StatementContext();
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
connectContext.setStatementContext(statementContext);
statementContext.setConnectContext(connectContext);
}
StatementBase statementBase = new LogicalPlanAdapter(outfileLogicalPlan, statementContext);
statementBase.setOrigStmt(new OriginStatement(statementBase.toSql(), 0));
return statementBase;
}
private void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
for (List<SelectStmt> selectStmts : selectStmtListPerParallel) {
for (List<StatementBase> selectStmts : selectStmtListPerParallel) {
ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, this);
jobExecutorList.add(executor);
}
@ -266,13 +370,13 @@ public class ExportJob implements Writable {
}
}
ArrayList<ArrayList<Long>> tabletsListPerQuery = splitTablets();
List<List<Long>> tabletsListPerQuery = splitTablets();
ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList();
for (ArrayList<Long> tabletsList : tabletsListPerQuery) {
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, tabletsList,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
ArrayList<TableRef> tableRefList = Lists.newArrayList();
List<List<TableRef>> tableRefListPerQuery = Lists.newArrayList();
for (List<Long> tabletsList : tabletsListPerQuery) {
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null,
(ArrayList) tabletsList, this.tableRef.getTableSample(), this.tableRef.getCommonHints());
List<TableRef> tableRefList = Lists.newArrayList();
tableRefList.add(tblRef);
tableRefListPerQuery.add(tableRefList);
}
@ -285,7 +389,7 @@ public class ExportJob implements Writable {
}
}
for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
for (List<TableRef> tableRefList : tableRefListPerQuery) {
FromClause fromClause = new FromClause(tableRefList);
// generate outfile clause
OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties());
@ -322,7 +426,7 @@ public class ExportJob implements Writable {
}
}
ArrayList<ArrayList<TableRef>> tableRefListPerParallel = getTableRefListPerParallel();
List<List<TableRef>> tableRefListPerParallel = getTableRefListPerParallel();
LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, tableRefListPerParallel.size());
// debug LOG output
@ -336,10 +440,10 @@ public class ExportJob implements Writable {
}
// generate 'select..outfile..' statement
for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) {
List<SelectStmt> selectStmtLists = Lists.newArrayList();
for (List<TableRef> tableRefList : tableRefListPerParallel) {
List<StatementBase> selectStmtLists = Lists.newArrayList();
for (TableRef tableRef : tableRefList) {
ArrayList<TableRef> tmpTableRefList = Lists.newArrayList(tableRef);
List<TableRef> tmpTableRefList = Lists.newArrayList(tableRef);
FromClause fromClause = new FromClause(tmpTableRefList);
// generate outfile clause
OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties());
@ -356,25 +460,25 @@ public class ExportJob implements Writable {
if (LOG.isDebugEnabled()) {
for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i);
for (SelectStmt outfile : selectStmtListPerParallel.get(i)) {
for (StatementBase outfile : selectStmtListPerParallel.get(i)) {
LOG.debug("outfile sql: [{}]", outfile.toSql());
}
}
}
}
private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws UserException {
ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets();
private List<List<TableRef>> getTableRefListPerParallel() throws UserException {
List<List<Long>> tabletsListPerParallel = splitTablets();
ArrayList<ArrayList<TableRef>> tableRefListPerParallel = Lists.newArrayList();
for (ArrayList<Long> tabletsList : tabletsListPerParallel) {
ArrayList<TableRef> tableRefList = Lists.newArrayList();
List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
for (List<Long> tabletsList : tabletsListPerParallel) {
List<TableRef> tableRefList = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
ArrayList<Long> tablets = new ArrayList<>(tabletsList.subList(i, end));
List<Long> tablets = new ArrayList<>(tabletsList.subList(i, end));
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(),
this.tableRef.getPartitionNames(), tablets,
this.tableRef.getPartitionNames(), (ArrayList) tablets,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
tableRefList.add(tblRef);
}
@ -383,7 +487,7 @@ public class ExportJob implements Writable {
return tableRefListPerParallel;
}
private ArrayList<ArrayList<Long>> splitTablets() throws UserException {
private List<List<Long>> splitTablets() throws UserException {
// get tablets
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl());
@ -392,8 +496,8 @@ public class ExportJob implements Writable {
try {
final Collection<Partition> partitions = new ArrayList<Partition>();
// get partitions
// user specifies partitions, already checked in ExportStmt
if (this.partitionNames != null) {
// user specifies partitions, already checked in ExportCommand
if (!this.partitionNames.isEmpty()) {
this.partitionNames.forEach(partitionName -> partitions.add(table.getPartition(partitionName)));
} else {
if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) {
@ -414,12 +518,17 @@ public class ExportJob implements Writable {
table.readUnlock();
}
/**
* Assign tablets to per parallel, for example:
* If the number of all tablets if 10, and the real parallelism is 4,
* then, the number of tablets of per parallel should be: 3 3 2 2.
*/
Integer tabletsAllNum = tabletIdList.size();
tabletsNum = tabletsAllNum;
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism;
ArrayList<ArrayList<Long>> tabletsListPerParallel = Lists.newArrayList();
List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
@ -540,11 +649,6 @@ public class ExportJob implements Writable {
}
}
public void cancelReplayedExportJob(ExportFailMsg.CancelType type, String msg) {
setExportJobState(ExportJobState.CANCELLED);
failMsg = new ExportFailMsg(type, msg);
}
private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws JobException {
if (getState() == ExportJobState.CANCELLED) {
return;
@ -561,7 +665,7 @@ public class ExportJob implements Writable {
// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
try {
register.cancelTask(id);
Env.getCurrentEnv().getExportTaskRegister().cancelTask(id);
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
}
@ -653,6 +757,20 @@ public class ExportJob implements Writable {
setExportJobState(newState);
}
/**
* If there are export which state is PENDING or EXPORTING or IN_QUEUE
* in checkpoint, we translate their state to CANCELLED.
*
* This function is only used in replay catalog phase.
*/
public void cancelReplayedExportJob() {
if (state == ExportJobState.PENDING || state == ExportJobState.EXPORTING || state == ExportJobState.IN_QUEUE) {
final String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
setExportJobState(ExportJobState.CANCELLED);
}
}
// TODO(ftw): delete
public synchronized boolean updateState(ExportJobState newState) {
return this.updateState(newState, false);

View File

@ -38,8 +38,6 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.MasterTaskExecutor;
import com.google.common.annotations.VisibleForTesting;
@ -103,27 +101,27 @@ public class ExportMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
List<ExportJob> newInQueueJobs = Lists.newArrayList();
for (ExportJob job : pendingJobs) {
if (handlePendingJobs(job)) {
newInQueueJobs.add(job);
}
}
LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size());
for (ExportJob job : newInQueueJobs) {
try {
MasterTask task = new ExportExportingTask(job);
job.setTask((ExportExportingTask) task);
if (exportingExecutor.submit(task)) {
LOG.info("success to submit IN_QUEUE export job. job: {}", job);
} else {
LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job);
}
} catch (Exception e) {
LOG.warn("run export exporting job {}.", job, e);
}
}
// List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
// List<ExportJob> newInQueueJobs = Lists.newArrayList();
// for (ExportJob job : pendingJobs) {
// if (handlePendingJobs(job)) {
// newInQueueJobs.add(job);
// }
// }
// LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size());
// for (ExportJob job : newInQueueJobs) {
// try {
// MasterTask task = new ExportExportingTask(job);
// job.setTask((ExportExportingTask) task);
// if (exportingExecutor.submit(task)) {
// LOG.info("success to submit IN_QUEUE export job. job: {}", job);
// } else {
// LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job);
// }
// } catch (Exception e) {
// LOG.warn("run export exporting job {}.", job, e);
// }
// }
}
private boolean handlePendingJobs(ExportJob job) {
@ -136,8 +134,7 @@ public class ExportMgr extends MasterDaemon {
// If the job is created from replay thread, all plan info will be lost.
// so the job has to be cancelled.
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
// job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
job.cancelReplayedExportJob(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
return false;
}
@ -168,8 +165,7 @@ public class ExportMgr extends MasterDaemon {
LOG.info("add export job. {}", job);
}
public void addExportJobAndRegisterTask(ExportStmt stmt) throws Exception {
ExportJob job = stmt.getExportJob();
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
long jobId = Env.getCurrentEnv().getNextId();
job.setId(jobId);
writeLock();
@ -179,7 +175,7 @@ public class ExportMgr extends MasterDaemon {
}
unprotectAddJob(job);
job.getJobExecutorList().forEach(executor -> {
Long taskId = ExportJob.register.registerTask(executor);
Long taskId = Env.getCurrentEnv().getExportTaskRegister().registerTask(executor);
executor.setTaskId(taskId);
job.getTaskIdToExecutor().put(taskId, executor);
});
@ -515,7 +511,6 @@ public class ExportMgr extends MasterDaemon {
readLock();
try {
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
// job.updateState(stateTransfer.getState(), true);
job.replayExportJobState(stateTransfer.getState());
job.setStartTimeMs(stateTransfer.getStartTimeMs());
job.setFinishTimeMs(stateTransfer.getFinishTimeMs());

View File

@ -19,6 +19,7 @@ package org.apache.doris.load;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@ -26,6 +27,9 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
@ -41,13 +45,14 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class ExportTaskExecutor implements TransientTaskExecutor {
List<SelectStmt> selectStmtLists;
List<StatementBase> selectStmtLists;
ExportJob exportJob;
@ -60,7 +65,7 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
private AtomicBoolean isFinished;
ExportTaskExecutor(List<SelectStmt> selectStmtLists, ExportJob exportJob) {
ExportTaskExecutor(List<StatementBase> selectStmtLists, ExportJob exportJob) {
this.selectStmtLists = selectStmtLists;
this.exportJob = exportJob;
this.isCanceled = new AtomicBoolean(false);
@ -85,8 +90,17 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
table.readLock();
try {
SelectStmt selectStmt = selectStmtLists.get(idx);
List<Long> tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
List<Long> tabletIds;
if (exportJob.getSessionVariables().isEnableNereidsPlanner()) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx);
Optional<UnboundRelation> unboundRelation = findUnboundRelation(
logicalPlanAdapter.getLogicalPlan());
tabletIds = unboundRelation.get().getTabletIds();
} else {
SelectStmt selectStmt = (SelectStmt) selectStmtLists.get(idx);
tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
}
for (Long tabletId : tabletIds) {
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
tabletId);
@ -100,6 +114,10 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
+ "now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
}
}
} catch (Exception e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
} finally {
table.readUnlock();
}
@ -168,4 +186,17 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
return outfileInfo;
}
private Optional<UnboundRelation> findUnboundRelation(LogicalPlan plan) {
if (plan instanceof UnboundRelation) {
return Optional.of((UnboundRelation) plan);
}
for (int i = 0; i < plan.children().size(); ++i) {
Optional<UnboundRelation> optional = findUnboundRelation((LogicalPlan) plan.children().get(i));
if (optional.isPresent()) {
return optional;
}
}
return Optional.empty();
}
}

View File

@ -429,28 +429,26 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
public LogicalPlan visitExport(ExportContext ctx) {
List<String> tableName = visitMultipartIdentifier(ctx.tableName);
List<String> partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition);
String path = parseConstant(ctx.filePath);
String whereSql = null;
// handle path string
String tmpPath = ctx.filePath.getText();
String path = escapeBackSlash(tmpPath.substring(1, tmpPath.length() - 1));
Optional<Expression> expr = Optional.empty();
if (ctx.whereClause() != null) {
WhereClauseContext whereClauseContext = ctx.whereClause();
int startIndex = whereClauseContext.start.getStartIndex();
int stopIndex = whereClauseContext.stop.getStopIndex();
org.antlr.v4.runtime.misc.Interval interval = new org.antlr.v4.runtime.misc.Interval(startIndex,
stopIndex);
whereSql = whereClauseContext.start.getInputStream().getText(interval);
expr = Optional.of(getExpression(ctx.whereClause().booleanExpression()));
}
Map<String, String> filePropertiesMap = null;
Map<String, String> filePropertiesMap = ImmutableMap.of();
if (ctx.propertyClause() != null) {
filePropertiesMap = visitPropertyClause(ctx.propertyClause());
}
BrokerDesc brokerDesc = null;
Optional<BrokerDesc> brokerDesc = Optional.empty();
if (ctx.withRemoteStorageSystem() != null) {
brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem());
brokerDesc = Optional.ofNullable(visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem()));
}
return new ExportCommand(tableName, partitions, whereSql, path, filePropertiesMap, brokerDesc);
return new ExportCommand(tableName, partitions, expr, path, filePropertiesMap, brokerDesc);
}
@Override

View File

@ -17,67 +17,326 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* export table
* EXPORT statement, export data to dirs by broker.
*
* syntax:
* EXPORT TABLE table_name [PARTITION (name1[, ...])]
* TO 'export_target_path'
* [PROPERTIES("key"="value")]
* BY BROKER 'broker_name' [( $broker_attrs)]
*/
public class ExportCommand extends Command implements ForwardWithSync {
private List<String> nameParts;
private String whereSql;
private String path;
private List<String> partitionsNameList;
private Map<String, String> fileProperties;
private BrokerDesc brokerDesc;
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
.add(LoadStmt.EXEC_MEM_LIMIT)
.add(LoadStmt.TIMEOUT_PROPERTY)
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
.add(LoadStmt.TIMEOUT_PROPERTY)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
.add("format")
.build();
private final List<String> nameParts;
private final Optional<Expression> expr;
private final String path;
private final List<String> partitionsNames;
private final Map<String, String> fileProperties;
private final Optional<BrokerDesc> brokerDesc;
/**
* constructor of ExportCommand
*/
public ExportCommand(List<String> nameParts, List<String> partitions, String whereSql, String path,
Map<String, String> fileProperties, BrokerDesc brokerDesc) {
public ExportCommand(List<String> nameParts, List<String> partitions, Optional<Expression> expr,
String path, Map<String, String> fileProperties, Optional<BrokerDesc> brokerDesc) {
super(PlanType.EXPORT_COMMAND);
this.nameParts = nameParts;
this.partitionsNameList = partitions;
this.whereSql = whereSql;
this.path = path.trim();
this.fileProperties = fileProperties;
this.brokerDesc = brokerDesc;
this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not be null"));
this.path = Objects.requireNonNull(path.trim(), "export path should not be null");
this.partitionsNames = ImmutableList.copyOf(
Objects.requireNonNull(partitions, "partitions should not be null"));
this.fileProperties = ImmutableSortedMap.copyOf(
Objects.requireNonNull(fileProperties, "fileProperties should not be null"),
String.CASE_INSENSITIVE_ORDER);
this.expr = expr;
if (!brokerDesc.isPresent()) {
this.brokerDesc = Optional.of(new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null));
} else {
this.brokerDesc = brokerDesc;
}
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
ExportStmt exportStmt = generateExportStmt();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
exportStmt.analyze(analyzer);
ctx.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
}
// get tblName
TableName tblName = getTableName(ctx);
private ExportStmt generateExportStmt() {
// generate tableRef
PartitionNames partitionNames = null;
if (!this.partitionsNameList.isEmpty()) {
partitionNames = new PartitionNames(false, this.partitionsNameList);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, tblName.getDb(), tblName.getTbl(),
PrivPredicate.SELECT)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "EXPORT",
ctx.getQualifiedUser(),
ctx.getRemoteIP(),
tblName.getDb() + ": " + tblName.getTbl());
}
TableRef tableRef = new TableRef(new TableName(getTableName()), null, partitionNames, null, null, null);
return new ExportStmt(tableRef, whereSql, path, fileProperties, brokerDesc);
// check phases
checkAllParameters(ctx, tblName, fileProperties);
ExportJob exportJob = generateExportJob(ctx, fileProperties, tblName);
// register job
ctx.getEnv().getExportMgr().addExportJobAndRegisterTask(exportJob);
}
public String getTableName() {
return nameParts.stream().map(Utils::quoteIfNeeded)
.reduce((left, right) -> left + "." + right).orElse("");
private void checkAllParameters(ConnectContext ctx, TableName tblName, Map<String, String> fileProperties)
throws UserException {
checkPropertyKey(fileProperties);
checkPartitions(ctx.getEnv(), tblName);
checkBrokerDesc(ctx);
checkFileProperties(fileProperties, tblName);
}
// check property key
private void checkPropertyKey(Map<String, String> properties) throws AnalysisException {
for (String key : properties.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new AnalysisException("Invalid property key: [" + key + "]");
}
}
}
// check partitions specified by user are belonged to the table.
private void checkPartitions(Env env, TableName tblName) throws AnalysisException, UserException {
if (this.partitionsNames.isEmpty()) {
return;
}
if (this.partitionsNames.size() > Config.maximum_number_of_export_partitions) {
throw new AnalysisException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by an export job");
}
Database db = env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
Table table = db.getTableOrAnalysisException(tblName.getTbl());
table.readLock();
try {
// check table
if (!table.isPartitioned()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
Table.TableType tblType = table.getType();
switch (tblType) {
case MYSQL:
case ODBC:
case JDBC:
case OLAP:
break;
case BROKER:
case SCHEMA:
case INLINE_VIEW:
case VIEW:
default:
throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
+ tblType + " type, do not support EXPORT.");
}
for (String partitionName : this.partitionsNames) {
Partition partition = table.getPartition(partitionName);
if (partition == null) {
throw new AnalysisException("Partition [" + partitionName + "] does not exist "
+ "in Table[" + tblName.getTbl() + "]");
}
}
} finally {
table.readUnlock();
}
}
private void checkBrokerDesc(ConnectContext ctx) throws UserException {
// check path is valid
StorageBackend.checkPath(this.path, this.brokerDesc.get().getStorageType());
if (brokerDesc.get().getStorageType() == StorageBackend.StorageType.BROKER) {
BrokerMgr brokerMgr = ctx.getEnv().getBrokerMgr();
if (!brokerMgr.containsBroker(brokerDesc.get().getName())) {
throw new AnalysisException("broker " + brokerDesc.get().getName() + " does not exist");
}
if (null == brokerMgr.getAnyBroker(brokerDesc.get().getName())) {
throw new AnalysisException("failed to get alive broker");
}
}
}
private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
throws UserException {
ExportJob exportJob = new ExportJob();
// set export job
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tblName.getDb());
exportJob.setDbId(db.getId());
exportJob.setTableName(tblName);
exportJob.setExportTable(db.getTableOrDdlException(tblName.getTbl()));
exportJob.setTableId(db.getTableOrDdlException(tblName.getTbl()).getId());
// set partitions
exportJob.setPartitionNames(this.partitionsNames);
// set where expression
exportJob.setWhereExpression(this.expr);
// set path
exportJob.setExportPath(this.path);
// set column separator
String columnSeparator = Separator.convertSeparator(fileProperties.getOrDefault(
PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR));
exportJob.setColumnSeparator(columnSeparator);
// set line delimiter
String lineDelimiter = Separator.convertSeparator(fileProperties.getOrDefault(
PropertyAnalyzer.PROPERTIES_LINE_DELIMITER, DEFAULT_LINE_DELIMITER));
exportJob.setLineDelimiter(lineDelimiter);
// set format
exportJob.setFormat(fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE, "csv")
.toLowerCase());
// set parallelism
int parallelism;
try {
parallelism = Integer.parseInt(fileProperties.getOrDefault(PARALLELISM, DEFAULT_PARALLELISM));
} catch (NumberFormatException e) {
throw new AnalysisException("The value of parallelism is invalid!");
}
exportJob.setParallelism(parallelism);
// set label
// if fileProperties contains LABEL, the label has been checked in check phases
String defaultLabel = "export_" + UUID.randomUUID();
exportJob.setLabel(fileProperties.getOrDefault(LABEL, defaultLabel));
// set max_file_size
exportJob.setMaxFileSize(fileProperties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""));
// set delete_existing_files
exportJob.setDeleteExistingFiles(fileProperties.getOrDefault(
OutFileClause.PROP_DELETE_EXISTING_FILES, ""));
// null means not specified
// "" means user specified zero columns
// if fileProperties contains KEY_IN_PARAM_COLUMNS, the columns have been checked in check phases
String columns = fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, null);
exportJob.setColumns(columns);
if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
exportJob.setExportColumns(split.splitToList(columns.toLowerCase()));
}
// set broker desc
exportJob.setBrokerDesc(this.brokerDesc.get());
// set sessions
exportJob.setQualifiedUser(ctx.getQualifiedUser());
exportJob.setUserIdentity(ctx.getCurrentUserIdentity());
Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable(
ConnectContext.get().getSessionVariable());
exportJob.setSessionVariables(optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable()));
exportJob.setTimeoutSecond(optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable())
.getQueryTimeoutS());
// exportJob generate outfile sql
exportJob.generateOutfileLogicalPlans(this.nameParts);
return exportJob;
}
private TableName getTableName(ConnectContext ctx) throws UserException {
// get tblName
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, this.nameParts);
TableName tblName = new TableName(qualifiedTableName.get(0), qualifiedTableName.get(1),
qualifiedTableName.get(2));
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
return tblName;
}
private void checkFileProperties(Map<String, String> fileProperties, TableName tblName) throws UserException {
// check user specified columns
if (fileProperties.containsKey(LoadStmt.KEY_IN_PARAM_COLUMNS)) {
checkColumns(fileProperties.get(LoadStmt.KEY_IN_PARAM_COLUMNS), tblName);
}
// check user specified label
if (fileProperties.containsKey(LABEL)) {
FeNameFormat.checkLabel(fileProperties.get(LABEL));
}
}
private void checkColumns(String columns, TableName tblName) throws AnalysisException, UserException {
if (columns.isEmpty()) {
throw new AnalysisException("columns can not be empty");
}
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tblName.getDb());
Table table = db.getTableOrDdlException(tblName.getTbl());
List<String> tableColumns = table.getBaseSchema().stream().map(column -> column.getName())
.collect(Collectors.toList());
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
List<String> columnsSpecified = split.splitToList(columns.toLowerCase());
for (String columnName : columnsSpecified) {
if (!tableColumns.contains(columnName)) {
throw new AnalysisException("unknown column [" + columnName + "] in table [" + tblName.getTbl() + "]");
}
}
}
public Map<String, String> getFileProperties() {
return this.fileProperties;
}
@Override

View File

@ -367,6 +367,7 @@ public class EditLog {
case OperationType.OP_EXPORT_CREATE: {
ExportJob job = (ExportJob) journal.getData();
ExportMgr exportMgr = env.getExportMgr();
job.cancelReplayedExportJob();
exportMgr.replayCreateExportJob(job);
break;
}

View File

@ -1342,6 +1342,7 @@ public class StmtExecutor {
}
sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
LOG.info("--ftw: over");
}
private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel,
@ -1353,6 +1354,7 @@ public class StmtExecutor {
// Query OK, 10 rows affected (0.01 sec)
//
// 2. If this is a query, send the result expr fields first, and send result data back to client.
LOG.info("--ftw: begin send result");
RowBatch batch;
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
if (Config.enable_workload_group && context.sessionVariable.getEnablePipelineEngine()) {
@ -1392,6 +1394,7 @@ public class StmtExecutor {
// register the fetch result time.
profile.getSummaryProfile().setTempStartTime();
batch = coord.getNext();
LOG.info("--ftw: get Next");
profile.getSummaryProfile().freshFetchResultConsumeTime();
// for outfile query, there will be only one empty batch send back with eos flag
@ -1405,26 +1408,33 @@ public class StmtExecutor {
// For some language driver, getting error packet after fields packet
// will be recognized as a success result
// so We need to send fields after first batch arrived
LOG.info("--ftw: judge");
if (!isSendFields) {
LOG.info("--ftw: !isSendFields");
if (!isOutfileQuery) {
LOG.info("--ftw: !isOutfileQuery send fiedl");
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
LOG.info("--ftw: isOutfileQuery send field");
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
LOG.info("--ftw: channel send packet, row = " + row);
}
profile.getSummaryProfile().freshWriteResultConsumeTime();
context.updateReturnRows(batch.getBatch().getRows().size());
context.setResultAttachedInfo(batch.getBatch().getAttachedInfos());
}
if (batch.isEos()) {
LOG.info("--ftw: isEos");
break;
}
}
if (cacheAnalyzer != null) {
LOG.info("--ftw: cacheAnalyzer");
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
isSendFields =
sendCachedValues(channel, cacheResult.getValuesList(), (Queriable) queryStmt, isSendFields,
@ -1434,6 +1444,7 @@ public class StmtExecutor {
cacheAnalyzer.updateCache();
}
if (!isSendFields) {
LOG.info("--ftw: !isSendFields 2");
if (!isOutfileQuery) {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
// Return a one row one column result set, with the real result number
@ -1451,9 +1462,11 @@ public class StmtExecutor {
}
}
LOG.info("--ftw: statisticsForAuditLog");
statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
context.getState().setEof();
profile.getSummaryProfile().setQueryFetchResultFinishTime();
LOG.info("--ftw: profile");
} catch (Exception e) {
// notify all be cancel runing fragment
// in some case may block all fragment handle threads
@ -1463,7 +1476,9 @@ public class StmtExecutor {
fetchResultSpan.recordException(e);
throw e;
} finally {
LOG.info("--ftw: end begin");
fetchResultSpan.end();
LOG.info("--ftw: end");
if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
@ -1474,6 +1489,7 @@ public class StmtExecutor {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
}
LOG.info("--ftw: last");
}
}
@ -2189,7 +2205,7 @@ public class StmtExecutor {
private void handleExportStmt() throws Exception {
ExportStmt exportStmt = (ExportStmt) parsedStmt;
// context.getEnv().getExportMgr().addExportJob(exportStmt);
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt.getExportJob());
}
private void handleCtasStmt() {

View File

@ -18,7 +18,7 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.AnalysisException;
@ -80,7 +80,7 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
if (!locationProperties.containsKey(HDFS_URI)) {
throw new AnalysisException(String.format("Configuration '%s' is required.", HDFS_URI));
}
ExportStmt.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS);
StorageBackend.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS);
hdfsUri = URI.create(locationProperties.get(HDFS_URI));
filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + hdfsUri.getPath();

View File

@ -0,0 +1,462 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* The `Export` sql finally generates the `Outfile` sql.
* This test is to test whether the generated outfile sql is correct.
*/
public class ExportToOutfileLogicalPlanTest extends TestWithFeService {
private String dbName = "testDb";
private String tblName = "table1";
/**
* create a database and a table
*
* @throws Exception
*/
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
createDatabase(dbName);
useDatabase(dbName);
createTable("create table " + tblName + "\n" + "(k1 int, k2 int, k3 int) "
+ "PARTITION BY RANGE(k1)\n" + "(\n"
+ "PARTITION p1 VALUES LESS THAN (\"20\"),\n"
+ "PARTITION p2 VALUES [(\"20\"),(\"30\")),\n"
+ "PARTITION p3 VALUES [(\"30\"),(\"40\")),\n"
+ "PARTITION p4 VALUES LESS THAN (\"50\")\n" + ")\n"
+ " distributed by hash(k1) buckets 10\n"
+ "properties(\"replication_num\" = \"1\");");
}
/**
* test normal export, sql:
*
* EXPORT TABLE testDb.table1
* TO "file:///tmp/exp_"
*
* @throws UserException
*/
@Test
public void testNormal() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1\n"
+ "TO \"file:///tmp/exp_\";";
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 10016L, 10018L, 10020L, 10022L, 10024L,
10026L, 10028L);
List<Long> currentTablets2 = Arrays.asList(10030L, 10032L, 10034L, 10036L, 10038L, 10040L, 10042L, 10044L,
10046L, 10048L);
List<Long> currentTablets3 = Arrays.asList(10050L, 10052L, 10054L, 10056L, 10058L, 10060L, 10062L, 10064L,
10066L, 10068L);
List<Long> currentTablets4 = Arrays.asList(10070L, 10072L, 10074L, 10076L, 10078L, 10080L, 10082L, 10084L,
10086L, 10088L);
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(1, outfileSqlPerParallel.size());
Assert.assertEquals(4, outfileSqlPerParallel.get(0).size());
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), Lists.newArrayList(), currentTablets1);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), Lists.newArrayList(), currentTablets2);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(2)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), Lists.newArrayList(), currentTablets3);
LogicalPlan plan4 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(3)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false), Lists.newArrayList(), currentTablets4);
}
/**
* test normal parallelism, sql:
*
* EXPORT TABLE testDb.table1
* TO "file:///tmp/exp_"
* PROPERTIES(
* "parallelism" = "4"
* );
*
* @throws UserException
*/
@Test
public void testNormalParallelism() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1\n"
+ "TO \"file:///tmp/exp_\" "
+ "PROPERTIES(\n"
+ "\"parallelism\" = \"4\"\n"
+ ");";
// This export sql should generate 4 array, and there should be 1 outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 10016L, 10018L, 10020L, 10022L, 10024L,
10026L, 10028L);
List<Long> currentTablets2 = Arrays.asList(10030L, 10032L, 10034L, 10036L, 10038L, 10040L, 10042L, 10044L,
10046L, 10048L);
List<Long> currentTablets3 = Arrays.asList(10050L, 10052L, 10054L, 10056L, 10058L, 10060L, 10062L, 10064L,
10066L, 10068L);
List<Long> currentTablets4 = Arrays.asList(10070L, 10072L, 10074L, 10076L, 10078L, 10080L, 10082L, 10084L,
10086L, 10088L);
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), Lists.newArrayList(), currentTablets1);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), Lists.newArrayList(), currentTablets2);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), Lists.newArrayList(), currentTablets3);
LogicalPlan plan4 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false), Lists.newArrayList(), currentTablets4);
}
/**
* test normal parallelism, sql:
*
* EXPORT TABLE testDb.table1
* TO "file:///tmp/exp_"
* PROPERTIES(
* "parallelism" = "3"
* );
*
* @throws UserException
*/
@Test
public void testMultiOutfilePerParalle() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1\n"
+ "TO \"file:///tmp/exp_\" "
+ "PROPERTIES(\n"
+ "\"parallelism\" = \"3\"\n"
+ ");";
// This export sql should generate 4 array, and there should be 1 outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 10016L, 10018L, 10020L, 10022L, 10024L,
10026L, 10028L);
List<Long> currentTablets12 = Arrays.asList(10030L, 10032L, 10034L, 10036L);
List<Long> currentTablets2 = Arrays.asList(10038L, 10040L, 10042L, 10044L, 10046L, 10048L, 10050L, 10052L,
10054L, 10056L);
List<Long> currentTablets22 = Arrays.asList(10058L, 10060L, 10062L);
List<Long> currentTablets3 = Arrays.asList(10064L, 10066L, 10068L, 10070L, 10072L, 10074L, 10076L, 10078L,
10080L, 10082L);
List<Long> currentTablets32 = Arrays.asList(10084L, 10086L, 10088L);
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(3, outfileSqlPerParallel.size());
Assert.assertEquals(2, outfileSqlPerParallel.get(0).size());
Assert.assertEquals(2, outfileSqlPerParallel.get(1).size());
Assert.assertEquals(2, outfileSqlPerParallel.get(2).size());
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), Lists.newArrayList(), currentTablets1);
LogicalPlan plan12 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(1)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan12, false), Lists.newArrayList(), currentTablets12);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), Lists.newArrayList(), currentTablets2);
LogicalPlan plan22 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(1)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan22, false), Lists.newArrayList(), currentTablets22);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), Lists.newArrayList(), currentTablets3);
LogicalPlan plan32 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(1)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan32, false), Lists.newArrayList(), currentTablets32);
}
/**
* test export single partition, sql:
*
* EXPORT TABLE testDb.table1 PARTITION (p1)
* TO "file:///tmp/exp_"
* PROPERTIES(
* "parallelism" = "4"
* );
*
* @throws UserException
*/
@Test
public void testPartitionParallelism() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1 PARTITION (p1)\n"
+ "TO \"file:///tmp/exp_\" "
+ "PROPERTIES(\n"
+ "\"parallelism\" = \"4\"\n"
+ ");";
// This export sql should generate 4 array, and there should be 1 outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L);
List<Long> currentTablets2 = Arrays.asList(10016L, 10018L, 10020L);
List<Long> currentTablets3 = Arrays.asList(10022L, 10024L);
List<Long> currentTablets4 = Arrays.asList(10026L, 10028L);
List<String> currentPartitions = Arrays.asList("p1");
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), currentPartitions, currentTablets1);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), currentPartitions, currentTablets2);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), currentPartitions, currentTablets3);
LogicalPlan plan4 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false), currentPartitions, currentTablets4);
}
/**
* test export multiple partition, sql:
*
* EXPORT TABLE testDb.table1 PARTITION (p1, p4)
* TO "file:///tmp/exp_"
* PROPERTIES(
* "parallelism" = "4"
* );
*
* @throws UserException
*/
@Test
public void testMultiPartitionParallelism() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1 PARTITION (p1, p4)\n"
+ "TO \"file:///tmp/exp_\" "
+ "PROPERTIES(\n"
+ "\"parallelism\" = \"4\"\n"
+ ");";
// This export sql should generate 4 array, and there should be 1 outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L, 10012L, 10014L, 10016L, 10018L);
List<Long> currentTablets2 = Arrays.asList(10020L, 10022L, 10024L, 10026L, 10028L);
List<Long> currentTablets3 = Arrays.asList(10070L, 10072L, 10074L, 10076L, 10078L);
List<Long> currentTablets4 = Arrays.asList(10080L, 10082L, 10084L, 10086L, 10088L);
List<String> currentPartitions = Arrays.asList("p1", "p4");
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(4, outfileSqlPerParallel.size());
Assert.assertEquals(1, outfileSqlPerParallel.get(0).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(1).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(2).size());
Assert.assertEquals(1, outfileSqlPerParallel.get(3).size());
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), currentPartitions, currentTablets1);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), currentPartitions, currentTablets2);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), currentPartitions, currentTablets3);
LogicalPlan plan4 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false), currentPartitions, currentTablets4);
}
/**
* test parallelism less than tablets, sql:
*
* EXPORT TABLE testDb.table1 PARTITION (p1)
* TO "file:///tmp/exp_"
* PROPERTIES(
* "parallelism" = "20"
* );
*
* @throws UserException
*/
@Test
public void testParallelismLessThanTablets() throws UserException {
// The origin export sql
String exportSql = "EXPORT TABLE testDb.table1 PARTITION (p1)\n"
+ "TO \"file:///tmp/exp_\" "
+ "PROPERTIES(\n"
+ "\"parallelism\" = \"20\"\n"
+ ");";
// This export sql should generate 10 array because parallelism is less than the number of tablets,
// so set parallelism = num(tablets)
// There should be 1 outfile sql in per array.
// The only difference between them is the TABLET(). They are:
List<Long> currentTablets1 = Arrays.asList(10010L);
List<Long> currentTablets2 = Arrays.asList(10012L);
List<Long> currentTablets3 = Arrays.asList(10014L);
List<Long> currentTablets4 = Arrays.asList(10016L);
List<Long> currentTablets5 = Arrays.asList(10018L);
List<Long> currentTablets6 = Arrays.asList(10020L);
List<Long> currentTablets7 = Arrays.asList(10022L);
List<Long> currentTablets8 = Arrays.asList(10024L);
List<Long> currentTablets9 = Arrays.asList(10026L);
List<Long> currentTablets10 = Arrays.asList(10028L);
List<String> currentPartitions = Arrays.asList("p1");
// generate outfile
List<List<StatementBase>> outfileSqlPerParallel = getOutfileSqlPerParallel(exportSql);
// check
Assert.assertEquals(10, outfileSqlPerParallel.size());
for (int i = 0; i < 10; ++i) {
Assert.assertEquals(1, outfileSqlPerParallel.get(i).size());
}
LogicalPlan plan1 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(0).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan1, false), currentPartitions, currentTablets1);
LogicalPlan plan2 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(1).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan2, false), currentPartitions, currentTablets2);
LogicalPlan plan3 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(2).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan3, false), currentPartitions, currentTablets3);
LogicalPlan plan4 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(3).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan4, false), currentPartitions, currentTablets4);
LogicalPlan plan5 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(4).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan5, false), currentPartitions, currentTablets5);
LogicalPlan plan6 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(5).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan6, false), currentPartitions, currentTablets6);
LogicalPlan plan7 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(6).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan7, false), currentPartitions, currentTablets7);
LogicalPlan plan8 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(7).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan8, false), currentPartitions, currentTablets8);
LogicalPlan plan9 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(8).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan9, false), currentPartitions, currentTablets9);
LogicalPlan plan10 = ((LogicalPlanAdapter) outfileSqlPerParallel.get(9).get(0)).getLogicalPlan();
checkPartitionsAndTablets(getUnboundRelation(plan10, false), currentPartitions, currentTablets10);
}
private LogicalPlan parseSql(String exportSql) {
StatementBase statementBase = new NereidsParser().parseSQL(exportSql).get(0);
return ((LogicalPlanAdapter) statementBase).getLogicalPlan();
}
// need open EnableNereidsPlanner
private List<List<StatementBase>> getOutfileSqlPerParallel(String exportSql) throws UserException {
ExportCommand exportCommand = (ExportCommand) parseSql(exportSql);
List<List<StatementBase>> selectStmtListPerParallel = Lists.newArrayList();
try {
Method getTableName = exportCommand.getClass().getDeclaredMethod("getTableName", ConnectContext.class);
getTableName.setAccessible(true);
Method checkAllParameters = exportCommand.getClass().getDeclaredMethod("checkAllParameters",
ConnectContext.class, TableName.class, Map.class);
checkAllParameters.setAccessible(true);
Method generateExportJob = exportCommand.getClass().getDeclaredMethod("generateExportJob",
ConnectContext.class, Map.class, TableName.class);
generateExportJob.setAccessible(true);
TableName tblName = (TableName) getTableName.invoke(exportCommand, connectContext);
checkAllParameters.invoke(exportCommand, connectContext, tblName, exportCommand.getFileProperties());
ExportJob job = (ExportJob) generateExportJob.invoke(
exportCommand, connectContext, exportCommand.getFileProperties(), tblName);
selectStmtListPerParallel = job.getSelectStmtListPerParallel();
} catch (NoSuchMethodException e) {
throw new UserException(e);
} catch (InvocationTargetException e) {
throw new UserException(e);
} catch (IllegalAccessException e) {
throw new UserException(e);
}
return selectStmtListPerParallel;
}
private void checkPartitionsAndTablets(UnboundRelation relation, List<String> currentPartitionNames,
List<Long> currentTabletIds) {
List<Long> tabletIdsToCheck = relation.getTabletIds();
List<String> partNamesToCheck = relation.getPartNames();
Assert.assertTrue(currentTabletIds.containsAll(tabletIdsToCheck));
Assert.assertTrue(tabletIdsToCheck.containsAll(currentTabletIds));
Assert.assertTrue(partNamesToCheck.containsAll(currentPartitionNames));
Assert.assertTrue(currentPartitionNames.containsAll(partNamesToCheck));
}
private UnboundRelation getUnboundRelation(LogicalPlan plan, boolean isHaveFilter) {
LogicalPlan lastPlan = (LogicalPlan) plan.children().get(0);
if (isHaveFilter) {
lastPlan = (LogicalPlan) lastPlan.children().get(0);
}
return (UnboundRelation) lastPlan.children().get(0).children().get(0);
}
}

View File

@ -377,37 +377,6 @@
69 ftw-69 87
-- !select_load3 --
70 ftw-70 88
71 ftw-71 89
72 ftw-72 90
73 ftw-73 91
74 ftw-74 92
75 ftw-75 93
76 ftw-76 94
77 ftw-77 95
78 ftw-78 96
79 ftw-79 97
80 ftw-80 98
81 ftw-81 99
82 ftw-82 100
83 ftw-83 101
84 ftw-84 102
85 ftw-85 103
86 ftw-86 104
87 ftw-87 105
88 ftw-88 106
89 ftw-89 107
90 ftw-90 108
91 ftw-91 109
92 ftw-92 110
93 ftw-93 111
94 ftw-94 112
95 ftw-95 113
96 ftw-96 114
97 ftw-97 115
98 ftw-98 116
99 ftw-99 117
100 ftw-100 118
101 ftw-101 119
102 ftw-102 120
103 ftw-103 121
@ -459,3 +428,141 @@
149 ftw-149 167
150 \N \N
-- !select_load6 --
83 ftw-83
84 ftw-84
85 ftw-85
86 ftw-86
87 ftw-87
88 ftw-88
89 ftw-89
90 ftw-90
91 ftw-91
92 ftw-92
93 ftw-93
94 ftw-94
95 ftw-95
96 ftw-96
97 ftw-97
98 ftw-98
99 ftw-99
100 ftw-100
101 ftw-101
102 ftw-102
103 ftw-103
104 ftw-104
105 ftw-105
106 ftw-106
107 ftw-107
108 ftw-108
109 ftw-109
110 ftw-110
111 ftw-111
112 ftw-112
113 ftw-113
114 ftw-114
115 ftw-115
116 ftw-116
117 ftw-117
118 ftw-118
119 ftw-119
120 ftw-120
121 ftw-121
122 ftw-122
123 ftw-123
124 ftw-124
125 ftw-125
126 ftw-126
127 ftw-127
128 ftw-128
129 ftw-129
130 ftw-130
131 ftw-131
132 ftw-132
133 ftw-133
134 ftw-134
135 ftw-135
136 ftw-136
137 ftw-137
138 ftw-138
139 ftw-139
140 ftw-140
141 ftw-141
142 ftw-142
143 ftw-143
144 ftw-144
145 ftw-145
146 ftw-146
147 ftw-147
148 ftw-148
149 ftw-149
-- !select_load7 --
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

View File

@ -214,96 +214,6 @@
8 2017-10-01 2017-10-01T00:00 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 Beijing 10 10 true 10 10 10 10.1 10.1 char10 10
11 2017-10-01 2017-10-01T00:00 Beijing 11 11 true 11 11 11 11.11 11.11 char11 11
12 2017-10-01 2017-10-01T00:00 Beijing 12 12 true 12 12 12 12.12 12.12 char12 12
13 2017-10-01 2017-10-01T00:00 Beijing 13 13 true 13 13 13 13.13 13.13 char13 13
14 2017-10-01 2017-10-01T00:00 Beijing 14 14 true 14 14 14 14.14 14.14 char14 14
15 2017-10-01 2017-10-01T00:00 Beijing 15 15 true 15 15 15 15.15 15.15 char15 15
16 2017-10-01 2017-10-01T00:00 Beijing 16 16 true 16 16 16 16.16 16.16 char16 16
17 2017-10-01 2017-10-01T00:00 Beijing 17 17 true 17 17 17 17.17 17.17 char17 17
18 2017-10-01 2017-10-01T00:00 Beijing 18 18 true 18 18 18 18.18 18.18 char18 18
19 2017-10-01 2017-10-01T00:00 Beijing 19 19 true 19 19 19 19.19 19.19 char19 19
20 2017-10-01 2017-10-01T00:00 Beijing 20 20 true 20 20 20 20.2 20.2 char20 20
21 2017-10-01 2017-10-01T00:00 Beijing 21 21 true 21 21 21 21.21 21.21 char21 21
22 2017-10-01 2017-10-01T00:00 Beijing 22 22 true 22 22 22 22.22 22.22 char22 22
23 2017-10-01 2017-10-01T00:00 Beijing 23 23 true 23 23 23 23.23 23.23 char23 23
24 2017-10-01 2017-10-01T00:00 Beijing 24 24 true 24 24 24 24.24 24.24 char24 24
25 2017-10-01 2017-10-01T00:00 Beijing 25 25 true 25 25 25 25.25 25.25 char25 25
26 2017-10-01 2017-10-01T00:00 Beijing 26 26 true 26 26 26 26.26 26.26 char26 26
27 2017-10-01 2017-10-01T00:00 Beijing 27 27 true 27 27 27 27.27 27.27 char27 27
28 2017-10-01 2017-10-01T00:00 Beijing 28 28 true 28 28 28 28.28 28.28 char28 28
29 2017-10-01 2017-10-01T00:00 Beijing 29 29 true 29 29 29 29.29 29.29 char29 29
30 2017-10-01 2017-10-01T00:00 Beijing 30 30 true 30 30 30 30.3 30.3 char30 30
31 2017-10-01 2017-10-01T00:00 Beijing 31 31 true 31 31 31 31.31 31.31 char31 31
32 2017-10-01 2017-10-01T00:00 Beijing 32 32 true 32 32 32 32.32 32.32 char32 32
33 2017-10-01 2017-10-01T00:00 Beijing 33 33 true 33 33 33 33.33 33.33 char33 33
34 2017-10-01 2017-10-01T00:00 Beijing 34 34 true 34 34 34 34.34 34.34 char34 34
35 2017-10-01 2017-10-01T00:00 Beijing 35 35 true 35 35 35 35.35 35.35 char35 35
36 2017-10-01 2017-10-01T00:00 Beijing 36 36 true 36 36 36 36.36 36.36 char36 36
37 2017-10-01 2017-10-01T00:00 Beijing 37 37 true 37 37 37 37.37 37.37 char37 37
38 2017-10-01 2017-10-01T00:00 Beijing 38 38 true 38 38 38 38.38 38.38 char38 38
39 2017-10-01 2017-10-01T00:00 Beijing 39 39 true 39 39 39 39.39 39.39 char39 39
40 2017-10-01 2017-10-01T00:00 Beijing 40 40 true 40 40 40 40.4 40.4 char40 40
41 2017-10-01 2017-10-01T00:00 Beijing 41 41 true 41 41 41 41.41 41.41 char41 41
42 2017-10-01 2017-10-01T00:00 Beijing 42 42 true 42 42 42 42.42 42.42 char42 42
43 2017-10-01 2017-10-01T00:00 Beijing 43 43 true 43 43 43 43.43 43.43 char43 43
44 2017-10-01 2017-10-01T00:00 Beijing 44 44 true 44 44 44 44.44 44.44 char44 44
45 2017-10-01 2017-10-01T00:00 Beijing 45 45 true 45 45 45 45.45 45.45 char45 45
46 2017-10-01 2017-10-01T00:00 Beijing 46 46 true 46 46 46 46.46 46.46 char46 46
47 2017-10-01 2017-10-01T00:00 Beijing 47 47 true 47 47 47 47.47 47.47 char47 47
48 2017-10-01 2017-10-01T00:00 Beijing 48 48 true 48 48 48 48.48 48.48 char48 48
49 2017-10-01 2017-10-01T00:00 Beijing 49 49 true 49 49 49 49.49 49.49 char49 49
50 2017-10-01 2017-10-01T00:00 Beijing 50 50 true 50 50 50 50.5 50.5 char50 50
51 2017-10-01 2017-10-01T00:00 Beijing 51 51 true 51 51 51 51.51 51.51 char51 51
52 2017-10-01 2017-10-01T00:00 Beijing 52 52 true 52 52 52 52.52 52.52 char52 52
53 2017-10-01 2017-10-01T00:00 Beijing 53 53 true 53 53 53 53.53 53.53 char53 53
54 2017-10-01 2017-10-01T00:00 Beijing 54 54 true 54 54 54 54.54 54.54 char54 54
55 2017-10-01 2017-10-01T00:00 Beijing 55 55 true 55 55 55 55.55 55.55 char55 55
56 2017-10-01 2017-10-01T00:00 Beijing 56 56 true 56 56 56 56.56 56.56 char56 56
57 2017-10-01 2017-10-01T00:00 Beijing 57 57 true 57 57 57 57.57 57.57 char57 57
58 2017-10-01 2017-10-01T00:00 Beijing 58 58 true 58 58 58 58.58 58.58 char58 58
59 2017-10-01 2017-10-01T00:00 Beijing 59 59 true 59 59 59 59.59 59.59 char59 59
60 2017-10-01 2017-10-01T00:00 Beijing 60 60 true 60 60 60 60.6 60.6 char60 60
61 2017-10-01 2017-10-01T00:00 Beijing 61 61 true 61 61 61 61.61 61.61 char61 61
62 2017-10-01 2017-10-01T00:00 Beijing 62 62 true 62 62 62 62.62 62.62 char62 62
63 2017-10-01 2017-10-01T00:00 Beijing 63 63 true 63 63 63 63.63 63.63 char63 63
64 2017-10-01 2017-10-01T00:00 Beijing 64 64 true 64 64 64 64.64 64.64 char64 64
65 2017-10-01 2017-10-01T00:00 Beijing 65 65 true 65 65 65 65.65 65.65 char65 65
66 2017-10-01 2017-10-01T00:00 Beijing 66 66 true 66 66 66 66.66 66.66 char66 66
67 2017-10-01 2017-10-01T00:00 Beijing 67 67 true 67 67 67 67.67 67.67 char67 67
68 2017-10-01 2017-10-01T00:00 Beijing 68 68 true 68 68 68 68.68 68.68 char68 68
69 2017-10-01 2017-10-01T00:00 Beijing 69 69 true 69 69 69 69.69 69.69 char69 69
70 2017-10-01 2017-10-01T00:00 Beijing 70 70 true 70 70 70 70.7 70.7 char70 70
71 2017-10-01 2017-10-01T00:00 Beijing 71 71 true 71 71 71 71.71 71.71 char71 71
72 2017-10-01 2017-10-01T00:00 Beijing 72 72 true 72 72 72 72.72 72.72 char72 72
73 2017-10-01 2017-10-01T00:00 Beijing 73 73 true 73 73 73 73.73 73.73 char73 73
74 2017-10-01 2017-10-01T00:00 Beijing 74 74 true 74 74 74 74.74 74.74 char74 74
75 2017-10-01 2017-10-01T00:00 Beijing 75 75 true 75 75 75 75.75 75.75 char75 75
76 2017-10-01 2017-10-01T00:00 Beijing 76 76 true 76 76 76 76.76 76.76 char76 76
77 2017-10-01 2017-10-01T00:00 Beijing 77 77 true 77 77 77 77.77 77.77 char77 77
78 2017-10-01 2017-10-01T00:00 Beijing 78 78 true 78 78 78 78.78 78.78 char78 78
79 2017-10-01 2017-10-01T00:00 Beijing 79 79 true 79 79 79 79.79 79.79 char79 79
80 2017-10-01 2017-10-01T00:00 Beijing 80 80 true 80 80 80 80.8 80.8 char80 80
81 2017-10-01 2017-10-01T00:00 Beijing 81 81 true 81 81 81 81.81 81.81 char81 81
82 2017-10-01 2017-10-01T00:00 Beijing 82 82 true 82 82 82 82.82 82.82 char82 82
83 2017-10-01 2017-10-01T00:00 Beijing 83 83 true 83 83 83 83.83 83.83 char83 83
84 2017-10-01 2017-10-01T00:00 Beijing 84 84 true 84 84 84 84.84 84.84 char84 84
85 2017-10-01 2017-10-01T00:00 Beijing 85 85 true 85 85 85 85.85 85.85 char85 85
86 2017-10-01 2017-10-01T00:00 Beijing 86 86 true 86 86 86 86.86 86.86 char86 86
87 2017-10-01 2017-10-01T00:00 Beijing 87 87 true 87 87 87 87.87 87.87 char87 87
88 2017-10-01 2017-10-01T00:00 Beijing 88 88 true 88 88 88 88.88 88.88 char88 88
89 2017-10-01 2017-10-01T00:00 Beijing 89 89 true 89 89 89 89.89 89.89 char89 89
90 2017-10-01 2017-10-01T00:00 Beijing 90 90 true 90 90 90 90.9 90.9 char90 90
91 2017-10-01 2017-10-01T00:00 Beijing 91 91 true 91 91 91 91.91 91.91 char91 91
92 2017-10-01 2017-10-01T00:00 Beijing 92 92 true 92 92 92 92.92 92.92 char92 92
93 2017-10-01 2017-10-01T00:00 Beijing 93 93 true 93 93 93 93.93 93.93 char93 93
94 2017-10-01 2017-10-01T00:00 Beijing 94 94 true 94 94 94 94.94 94.94 char94 94
95 2017-10-01 2017-10-01T00:00 Beijing 95 95 true 95 95 95 95.95 95.95 char95 95
96 2017-10-01 2017-10-01T00:00 Beijing 96 96 true 96 96 96 96.96 96.96 char96 96
97 2017-10-01 2017-10-01T00:00 Beijing 97 97 true 97 97 97 97.97 97.97 char97 97
98 2017-10-01 2017-10-01T00:00 Beijing 98 98 true 98 98 98 98.98 98.98 char98 98
99 2017-10-01 2017-10-01T00:00 Beijing 99 99 true 99 99 99 99.99 99.99 char99 99
100 2017-10-01 2017-10-01T00:00 \N \N \N \N \N \N \N \N \N \N \N
-- !select_load3 --
1 2017-10-01 2017-10-01T00:00 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
@ -316,96 +226,6 @@
8 2017-10-01 2017-10-01T00:00 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 Beijing 10 10 true 10 10 10 10.1 10.1 char10 10
11 2017-10-01 2017-10-01T00:00 Beijing 11 11 true 11 11 11 11.11 11.11 char11 11
12 2017-10-01 2017-10-01T00:00 Beijing 12 12 true 12 12 12 12.12 12.12 char12 12
13 2017-10-01 2017-10-01T00:00 Beijing 13 13 true 13 13 13 13.13 13.13 char13 13
14 2017-10-01 2017-10-01T00:00 Beijing 14 14 true 14 14 14 14.14 14.14 char14 14
15 2017-10-01 2017-10-01T00:00 Beijing 15 15 true 15 15 15 15.15 15.15 char15 15
16 2017-10-01 2017-10-01T00:00 Beijing 16 16 true 16 16 16 16.16 16.16 char16 16
17 2017-10-01 2017-10-01T00:00 Beijing 17 17 true 17 17 17 17.17 17.17 char17 17
18 2017-10-01 2017-10-01T00:00 Beijing 18 18 true 18 18 18 18.18 18.18 char18 18
19 2017-10-01 2017-10-01T00:00 Beijing 19 19 true 19 19 19 19.19 19.19 char19 19
20 2017-10-01 2017-10-01T00:00 Beijing 20 20 true 20 20 20 20.2 20.2 char20 20
21 2017-10-01 2017-10-01T00:00 Beijing 21 21 true 21 21 21 21.21 21.21 char21 21
22 2017-10-01 2017-10-01T00:00 Beijing 22 22 true 22 22 22 22.22 22.22 char22 22
23 2017-10-01 2017-10-01T00:00 Beijing 23 23 true 23 23 23 23.23 23.23 char23 23
24 2017-10-01 2017-10-01T00:00 Beijing 24 24 true 24 24 24 24.24 24.24 char24 24
25 2017-10-01 2017-10-01T00:00 Beijing 25 25 true 25 25 25 25.25 25.25 char25 25
26 2017-10-01 2017-10-01T00:00 Beijing 26 26 true 26 26 26 26.26 26.26 char26 26
27 2017-10-01 2017-10-01T00:00 Beijing 27 27 true 27 27 27 27.27 27.27 char27 27
28 2017-10-01 2017-10-01T00:00 Beijing 28 28 true 28 28 28 28.28 28.28 char28 28
29 2017-10-01 2017-10-01T00:00 Beijing 29 29 true 29 29 29 29.29 29.29 char29 29
30 2017-10-01 2017-10-01T00:00 Beijing 30 30 true 30 30 30 30.3 30.3 char30 30
31 2017-10-01 2017-10-01T00:00 Beijing 31 31 true 31 31 31 31.31 31.31 char31 31
32 2017-10-01 2017-10-01T00:00 Beijing 32 32 true 32 32 32 32.32 32.32 char32 32
33 2017-10-01 2017-10-01T00:00 Beijing 33 33 true 33 33 33 33.33 33.33 char33 33
34 2017-10-01 2017-10-01T00:00 Beijing 34 34 true 34 34 34 34.34 34.34 char34 34
35 2017-10-01 2017-10-01T00:00 Beijing 35 35 true 35 35 35 35.35 35.35 char35 35
36 2017-10-01 2017-10-01T00:00 Beijing 36 36 true 36 36 36 36.36 36.36 char36 36
37 2017-10-01 2017-10-01T00:00 Beijing 37 37 true 37 37 37 37.37 37.37 char37 37
38 2017-10-01 2017-10-01T00:00 Beijing 38 38 true 38 38 38 38.38 38.38 char38 38
39 2017-10-01 2017-10-01T00:00 Beijing 39 39 true 39 39 39 39.39 39.39 char39 39
40 2017-10-01 2017-10-01T00:00 Beijing 40 40 true 40 40 40 40.4 40.4 char40 40
41 2017-10-01 2017-10-01T00:00 Beijing 41 41 true 41 41 41 41.41 41.41 char41 41
42 2017-10-01 2017-10-01T00:00 Beijing 42 42 true 42 42 42 42.42 42.42 char42 42
43 2017-10-01 2017-10-01T00:00 Beijing 43 43 true 43 43 43 43.43 43.43 char43 43
44 2017-10-01 2017-10-01T00:00 Beijing 44 44 true 44 44 44 44.44 44.44 char44 44
45 2017-10-01 2017-10-01T00:00 Beijing 45 45 true 45 45 45 45.45 45.45 char45 45
46 2017-10-01 2017-10-01T00:00 Beijing 46 46 true 46 46 46 46.46 46.46 char46 46
47 2017-10-01 2017-10-01T00:00 Beijing 47 47 true 47 47 47 47.47 47.47 char47 47
48 2017-10-01 2017-10-01T00:00 Beijing 48 48 true 48 48 48 48.48 48.48 char48 48
49 2017-10-01 2017-10-01T00:00 Beijing 49 49 true 49 49 49 49.49 49.49 char49 49
50 2017-10-01 2017-10-01T00:00 Beijing 50 50 true 50 50 50 50.5 50.5 char50 50
51 2017-10-01 2017-10-01T00:00 Beijing 51 51 true 51 51 51 51.51 51.51 char51 51
52 2017-10-01 2017-10-01T00:00 Beijing 52 52 true 52 52 52 52.52 52.52 char52 52
53 2017-10-01 2017-10-01T00:00 Beijing 53 53 true 53 53 53 53.53 53.53 char53 53
54 2017-10-01 2017-10-01T00:00 Beijing 54 54 true 54 54 54 54.54 54.54 char54 54
55 2017-10-01 2017-10-01T00:00 Beijing 55 55 true 55 55 55 55.55 55.55 char55 55
56 2017-10-01 2017-10-01T00:00 Beijing 56 56 true 56 56 56 56.56 56.56 char56 56
57 2017-10-01 2017-10-01T00:00 Beijing 57 57 true 57 57 57 57.57 57.57 char57 57
58 2017-10-01 2017-10-01T00:00 Beijing 58 58 true 58 58 58 58.58 58.58 char58 58
59 2017-10-01 2017-10-01T00:00 Beijing 59 59 true 59 59 59 59.59 59.59 char59 59
60 2017-10-01 2017-10-01T00:00 Beijing 60 60 true 60 60 60 60.6 60.6 char60 60
61 2017-10-01 2017-10-01T00:00 Beijing 61 61 true 61 61 61 61.61 61.61 char61 61
62 2017-10-01 2017-10-01T00:00 Beijing 62 62 true 62 62 62 62.62 62.62 char62 62
63 2017-10-01 2017-10-01T00:00 Beijing 63 63 true 63 63 63 63.63 63.63 char63 63
64 2017-10-01 2017-10-01T00:00 Beijing 64 64 true 64 64 64 64.64 64.64 char64 64
65 2017-10-01 2017-10-01T00:00 Beijing 65 65 true 65 65 65 65.65 65.65 char65 65
66 2017-10-01 2017-10-01T00:00 Beijing 66 66 true 66 66 66 66.66 66.66 char66 66
67 2017-10-01 2017-10-01T00:00 Beijing 67 67 true 67 67 67 67.67 67.67 char67 67
68 2017-10-01 2017-10-01T00:00 Beijing 68 68 true 68 68 68 68.68 68.68 char68 68
69 2017-10-01 2017-10-01T00:00 Beijing 69 69 true 69 69 69 69.69 69.69 char69 69
70 2017-10-01 2017-10-01T00:00 Beijing 70 70 true 70 70 70 70.7 70.7 char70 70
71 2017-10-01 2017-10-01T00:00 Beijing 71 71 true 71 71 71 71.71 71.71 char71 71
72 2017-10-01 2017-10-01T00:00 Beijing 72 72 true 72 72 72 72.72 72.72 char72 72
73 2017-10-01 2017-10-01T00:00 Beijing 73 73 true 73 73 73 73.73 73.73 char73 73
74 2017-10-01 2017-10-01T00:00 Beijing 74 74 true 74 74 74 74.74 74.74 char74 74
75 2017-10-01 2017-10-01T00:00 Beijing 75 75 true 75 75 75 75.75 75.75 char75 75
76 2017-10-01 2017-10-01T00:00 Beijing 76 76 true 76 76 76 76.76 76.76 char76 76
77 2017-10-01 2017-10-01T00:00 Beijing 77 77 true 77 77 77 77.77 77.77 char77 77
78 2017-10-01 2017-10-01T00:00 Beijing 78 78 true 78 78 78 78.78 78.78 char78 78
79 2017-10-01 2017-10-01T00:00 Beijing 79 79 true 79 79 79 79.79 79.79 char79 79
80 2017-10-01 2017-10-01T00:00 Beijing 80 80 true 80 80 80 80.8 80.8 char80 80
81 2017-10-01 2017-10-01T00:00 Beijing 81 81 true 81 81 81 81.81 81.81 char81 81
82 2017-10-01 2017-10-01T00:00 Beijing 82 82 true 82 82 82 82.82 82.82 char82 82
83 2017-10-01 2017-10-01T00:00 Beijing 83 83 true 83 83 83 83.83 83.83 char83 83
84 2017-10-01 2017-10-01T00:00 Beijing 84 84 true 84 84 84 84.84 84.84 char84 84
85 2017-10-01 2017-10-01T00:00 Beijing 85 85 true 85 85 85 85.85 85.85 char85 85
86 2017-10-01 2017-10-01T00:00 Beijing 86 86 true 86 86 86 86.86 86.86 char86 86
87 2017-10-01 2017-10-01T00:00 Beijing 87 87 true 87 87 87 87.87 87.87 char87 87
88 2017-10-01 2017-10-01T00:00 Beijing 88 88 true 88 88 88 88.88 88.88 char88 88
89 2017-10-01 2017-10-01T00:00 Beijing 89 89 true 89 89 89 89.89 89.89 char89 89
90 2017-10-01 2017-10-01T00:00 Beijing 90 90 true 90 90 90 90.9 90.9 char90 90
91 2017-10-01 2017-10-01T00:00 Beijing 91 91 true 91 91 91 91.91 91.91 char91 91
92 2017-10-01 2017-10-01T00:00 Beijing 92 92 true 92 92 92 92.92 92.92 char92 92
93 2017-10-01 2017-10-01T00:00 Beijing 93 93 true 93 93 93 93.93 93.93 char93 93
94 2017-10-01 2017-10-01T00:00 Beijing 94 94 true 94 94 94 94.94 94.94 char94 94
95 2017-10-01 2017-10-01T00:00 Beijing 95 95 true 95 95 95 95.95 95.95 char95 95
96 2017-10-01 2017-10-01T00:00 Beijing 96 96 true 96 96 96 96.96 96.96 char96 96
97 2017-10-01 2017-10-01T00:00 Beijing 97 97 true 97 97 97 97.97 97.97 char97 97
98 2017-10-01 2017-10-01T00:00 Beijing 98 98 true 98 98 98 98.98 98.98 char98 98
99 2017-10-01 2017-10-01T00:00 Beijing 99 99 true 99 99 99 99.99 99.99 char99 99
100 2017-10-01 2017-10-01T00:00 \N \N \N \N \N \N \N \N \N \N \N
-- !select_load4 --
1 2017-10-01 2017-10-01T00:00 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
@ -418,94 +238,4 @@
8 2017-10-01 2017-10-01T00:00 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 Beijing 10 10 true 10 10 10 10.1 10.1 char10 10
11 2017-10-01 2017-10-01T00:00 Beijing 11 11 true 11 11 11 11.11 11.11 char11 11
12 2017-10-01 2017-10-01T00:00 Beijing 12 12 true 12 12 12 12.12 12.12 char12 12
13 2017-10-01 2017-10-01T00:00 Beijing 13 13 true 13 13 13 13.13 13.13 char13 13
14 2017-10-01 2017-10-01T00:00 Beijing 14 14 true 14 14 14 14.14 14.14 char14 14
15 2017-10-01 2017-10-01T00:00 Beijing 15 15 true 15 15 15 15.15 15.15 char15 15
16 2017-10-01 2017-10-01T00:00 Beijing 16 16 true 16 16 16 16.16 16.16 char16 16
17 2017-10-01 2017-10-01T00:00 Beijing 17 17 true 17 17 17 17.17 17.17 char17 17
18 2017-10-01 2017-10-01T00:00 Beijing 18 18 true 18 18 18 18.18 18.18 char18 18
19 2017-10-01 2017-10-01T00:00 Beijing 19 19 true 19 19 19 19.19 19.19 char19 19
20 2017-10-01 2017-10-01T00:00 Beijing 20 20 true 20 20 20 20.2 20.2 char20 20
21 2017-10-01 2017-10-01T00:00 Beijing 21 21 true 21 21 21 21.21 21.21 char21 21
22 2017-10-01 2017-10-01T00:00 Beijing 22 22 true 22 22 22 22.22 22.22 char22 22
23 2017-10-01 2017-10-01T00:00 Beijing 23 23 true 23 23 23 23.23 23.23 char23 23
24 2017-10-01 2017-10-01T00:00 Beijing 24 24 true 24 24 24 24.24 24.24 char24 24
25 2017-10-01 2017-10-01T00:00 Beijing 25 25 true 25 25 25 25.25 25.25 char25 25
26 2017-10-01 2017-10-01T00:00 Beijing 26 26 true 26 26 26 26.26 26.26 char26 26
27 2017-10-01 2017-10-01T00:00 Beijing 27 27 true 27 27 27 27.27 27.27 char27 27
28 2017-10-01 2017-10-01T00:00 Beijing 28 28 true 28 28 28 28.28 28.28 char28 28
29 2017-10-01 2017-10-01T00:00 Beijing 29 29 true 29 29 29 29.29 29.29 char29 29
30 2017-10-01 2017-10-01T00:00 Beijing 30 30 true 30 30 30 30.3 30.3 char30 30
31 2017-10-01 2017-10-01T00:00 Beijing 31 31 true 31 31 31 31.31 31.31 char31 31
32 2017-10-01 2017-10-01T00:00 Beijing 32 32 true 32 32 32 32.32 32.32 char32 32
33 2017-10-01 2017-10-01T00:00 Beijing 33 33 true 33 33 33 33.33 33.33 char33 33
34 2017-10-01 2017-10-01T00:00 Beijing 34 34 true 34 34 34 34.34 34.34 char34 34
35 2017-10-01 2017-10-01T00:00 Beijing 35 35 true 35 35 35 35.35 35.35 char35 35
36 2017-10-01 2017-10-01T00:00 Beijing 36 36 true 36 36 36 36.36 36.36 char36 36
37 2017-10-01 2017-10-01T00:00 Beijing 37 37 true 37 37 37 37.37 37.37 char37 37
38 2017-10-01 2017-10-01T00:00 Beijing 38 38 true 38 38 38 38.38 38.38 char38 38
39 2017-10-01 2017-10-01T00:00 Beijing 39 39 true 39 39 39 39.39 39.39 char39 39
40 2017-10-01 2017-10-01T00:00 Beijing 40 40 true 40 40 40 40.4 40.4 char40 40
41 2017-10-01 2017-10-01T00:00 Beijing 41 41 true 41 41 41 41.41 41.41 char41 41
42 2017-10-01 2017-10-01T00:00 Beijing 42 42 true 42 42 42 42.42 42.42 char42 42
43 2017-10-01 2017-10-01T00:00 Beijing 43 43 true 43 43 43 43.43 43.43 char43 43
44 2017-10-01 2017-10-01T00:00 Beijing 44 44 true 44 44 44 44.44 44.44 char44 44
45 2017-10-01 2017-10-01T00:00 Beijing 45 45 true 45 45 45 45.45 45.45 char45 45
46 2017-10-01 2017-10-01T00:00 Beijing 46 46 true 46 46 46 46.46 46.46 char46 46
47 2017-10-01 2017-10-01T00:00 Beijing 47 47 true 47 47 47 47.47 47.47 char47 47
48 2017-10-01 2017-10-01T00:00 Beijing 48 48 true 48 48 48 48.48 48.48 char48 48
49 2017-10-01 2017-10-01T00:00 Beijing 49 49 true 49 49 49 49.49 49.49 char49 49
50 2017-10-01 2017-10-01T00:00 Beijing 50 50 true 50 50 50 50.5 50.5 char50 50
51 2017-10-01 2017-10-01T00:00 Beijing 51 51 true 51 51 51 51.51 51.51 char51 51
52 2017-10-01 2017-10-01T00:00 Beijing 52 52 true 52 52 52 52.52 52.52 char52 52
53 2017-10-01 2017-10-01T00:00 Beijing 53 53 true 53 53 53 53.53 53.53 char53 53
54 2017-10-01 2017-10-01T00:00 Beijing 54 54 true 54 54 54 54.54 54.54 char54 54
55 2017-10-01 2017-10-01T00:00 Beijing 55 55 true 55 55 55 55.55 55.55 char55 55
56 2017-10-01 2017-10-01T00:00 Beijing 56 56 true 56 56 56 56.56 56.56 char56 56
57 2017-10-01 2017-10-01T00:00 Beijing 57 57 true 57 57 57 57.57 57.57 char57 57
58 2017-10-01 2017-10-01T00:00 Beijing 58 58 true 58 58 58 58.58 58.58 char58 58
59 2017-10-01 2017-10-01T00:00 Beijing 59 59 true 59 59 59 59.59 59.59 char59 59
60 2017-10-01 2017-10-01T00:00 Beijing 60 60 true 60 60 60 60.6 60.6 char60 60
61 2017-10-01 2017-10-01T00:00 Beijing 61 61 true 61 61 61 61.61 61.61 char61 61
62 2017-10-01 2017-10-01T00:00 Beijing 62 62 true 62 62 62 62.62 62.62 char62 62
63 2017-10-01 2017-10-01T00:00 Beijing 63 63 true 63 63 63 63.63 63.63 char63 63
64 2017-10-01 2017-10-01T00:00 Beijing 64 64 true 64 64 64 64.64 64.64 char64 64
65 2017-10-01 2017-10-01T00:00 Beijing 65 65 true 65 65 65 65.65 65.65 char65 65
66 2017-10-01 2017-10-01T00:00 Beijing 66 66 true 66 66 66 66.66 66.66 char66 66
67 2017-10-01 2017-10-01T00:00 Beijing 67 67 true 67 67 67 67.67 67.67 char67 67
68 2017-10-01 2017-10-01T00:00 Beijing 68 68 true 68 68 68 68.68 68.68 char68 68
69 2017-10-01 2017-10-01T00:00 Beijing 69 69 true 69 69 69 69.69 69.69 char69 69
70 2017-10-01 2017-10-01T00:00 Beijing 70 70 true 70 70 70 70.7 70.7 char70 70
71 2017-10-01 2017-10-01T00:00 Beijing 71 71 true 71 71 71 71.71 71.71 char71 71
72 2017-10-01 2017-10-01T00:00 Beijing 72 72 true 72 72 72 72.72 72.72 char72 72
73 2017-10-01 2017-10-01T00:00 Beijing 73 73 true 73 73 73 73.73 73.73 char73 73
74 2017-10-01 2017-10-01T00:00 Beijing 74 74 true 74 74 74 74.74 74.74 char74 74
75 2017-10-01 2017-10-01T00:00 Beijing 75 75 true 75 75 75 75.75 75.75 char75 75
76 2017-10-01 2017-10-01T00:00 Beijing 76 76 true 76 76 76 76.76 76.76 char76 76
77 2017-10-01 2017-10-01T00:00 Beijing 77 77 true 77 77 77 77.77 77.77 char77 77
78 2017-10-01 2017-10-01T00:00 Beijing 78 78 true 78 78 78 78.78 78.78 char78 78
79 2017-10-01 2017-10-01T00:00 Beijing 79 79 true 79 79 79 79.79 79.79 char79 79
80 2017-10-01 2017-10-01T00:00 Beijing 80 80 true 80 80 80 80.8 80.8 char80 80
81 2017-10-01 2017-10-01T00:00 Beijing 81 81 true 81 81 81 81.81 81.81 char81 81
82 2017-10-01 2017-10-01T00:00 Beijing 82 82 true 82 82 82 82.82 82.82 char82 82
83 2017-10-01 2017-10-01T00:00 Beijing 83 83 true 83 83 83 83.83 83.83 char83 83
84 2017-10-01 2017-10-01T00:00 Beijing 84 84 true 84 84 84 84.84 84.84 char84 84
85 2017-10-01 2017-10-01T00:00 Beijing 85 85 true 85 85 85 85.85 85.85 char85 85
86 2017-10-01 2017-10-01T00:00 Beijing 86 86 true 86 86 86 86.86 86.86 char86 86
87 2017-10-01 2017-10-01T00:00 Beijing 87 87 true 87 87 87 87.87 87.87 char87 87
88 2017-10-01 2017-10-01T00:00 Beijing 88 88 true 88 88 88 88.88 88.88 char88 88
89 2017-10-01 2017-10-01T00:00 Beijing 89 89 true 89 89 89 89.89 89.89 char89 89
90 2017-10-01 2017-10-01T00:00 Beijing 90 90 true 90 90 90 90.9 90.9 char90 90
91 2017-10-01 2017-10-01T00:00 Beijing 91 91 true 91 91 91 91.91 91.91 char91 91
92 2017-10-01 2017-10-01T00:00 Beijing 92 92 true 92 92 92 92.92 92.92 char92 92
93 2017-10-01 2017-10-01T00:00 Beijing 93 93 true 93 93 93 93.93 93.93 char93 93
94 2017-10-01 2017-10-01T00:00 Beijing 94 94 true 94 94 94 94.94 94.94 char94 94
95 2017-10-01 2017-10-01T00:00 Beijing 95 95 true 95 95 95 95.95 95.95 char95 95
96 2017-10-01 2017-10-01T00:00 Beijing 96 96 true 96 96 96 96.96 96.96 char96 96
97 2017-10-01 2017-10-01T00:00 Beijing 97 97 true 97 97 97 97.97 97.97 char97 97
98 2017-10-01 2017-10-01T00:00 Beijing 98 98 true 98 98 98 98.98 98.98 char98 98
99 2017-10-01 2017-10-01T00:00 Beijing 99 99 true 99 99 99 99.99 99.99 char99 99
100 2017-10-01 2017-10-01T00:00 \N \N \N \N \N \N \N \N \N \N \N

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_basic", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
@ -332,9 +337,8 @@ suite("test_export_basic", "p0") {
check_path_exists.call("${outFilePath}")
// exec export
// TODO(ftw): EXPORT TABLE ${table_export_name} PARTITION (more_than_70) where id >100
sql """
EXPORT TABLE ${table_export_name} PARTITION (more_than_70)
EXPORT TABLE ${table_export_name} PARTITION (more_than_70) where id >100
TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
@ -377,7 +381,7 @@ suite("test_export_basic", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(81, json.NumberTotalRows)
assertEquals(50, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
@ -431,4 +435,134 @@ suite("test_export_basic", "p0") {
} finally {
delete_files.call("${outFilePath}")
}
// 6. test columns property
uuid = UUID.randomUUID().toString()
outFilePath = """${outfile_path_prefix}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
check_path_exists.call("${outFilePath}")
// exec export
sql """
EXPORT TABLE ${table_export_name} PARTITION (more_than_70) where age >100
TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
"format" = "csv",
"column_separator"=",",
"columns" = "id, name"
);
"""
waiting_export.call(label)
// check file amounts
check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`id` int(11) NULL,
`name` string NULL
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"
set 'column_separator', ','
set 'columns', 'id, name'
set 'strict_mode', 'true'
file "${file_path}"
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(67, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
qt_select_load6 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
// 7. test columns property 2
uuid = UUID.randomUUID().toString()
outFilePath = """${outfile_path_prefix}_${uuid}"""
label = "label_${uuid}"
try {
// check export path
check_path_exists.call("${outFilePath}")
// exec export
sql """
EXPORT TABLE ${table_export_name} PARTITION (more_than_70) where age >100
TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
"format" = "csv",
"column_separator"=",",
"columns" = "id"
);
"""
waiting_export.call(label)
// check file amounts
check_file_amounts.call("${outFilePath}", 1)
// check data correctness
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`id` int(11) NULL
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
File[] files = new File("${outFilePath}").listFiles()
String file_path = files[0].getAbsolutePath()
streamLoad {
table "${table_load_name}"
set 'column_separator', ','
set 'columns', 'id'
set 'strict_mode', 'true'
file "${file_path}"
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(67, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
qt_select_load7 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${table_load_name}")
delete_files.call("${outFilePath}")
}
}

View File

@ -22,6 +22,10 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_csv", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
@ -217,9 +221,8 @@ suite("test_export_csv", "p0") {
check_path_exists.call("${outFilePath}")
// exec export
// TODO(ftw): EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
sql """
EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
"format" = "csv",
@ -274,7 +277,7 @@ suite("test_export_csv", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(100, json.NumberTotalRows)
assertEquals(10, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
@ -295,9 +298,8 @@ suite("test_export_csv", "p0") {
check_path_exists.call("${outFilePath}")
// exec export
// TODO(ftw): EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
sql """
EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
"format" = "csv_with_names",
@ -353,7 +355,7 @@ suite("test_export_csv", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(100, json.NumberTotalRows)
assertEquals(10, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
@ -374,9 +376,8 @@ suite("test_export_csv", "p0") {
check_path_exists.call("${outFilePath}")
// exec export
// TODO(ftw): EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
sql """
EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
EXPORT TABLE ${table_export_name} where user_id <11 TO "file://${outFilePath}/"
PROPERTIES(
"label" = "${label}",
"format" = "csv_with_names_and_types",
@ -432,7 +433,7 @@ suite("test_export_csv", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(100, json.NumberTotalRows)
assertEquals(10, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_data_types", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_empty_table", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_orc", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_parquet", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_expr") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_parquet") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
def dbName = "test_outfile_parquet"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "USE $dbName"

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_separator") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")

View File

@ -22,6 +22,11 @@ import java.nio.file.Files
import java.nio.file.Paths
suite("test_export_big_data", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
// check whether the FE config 'enable_outfile_to_local' is true
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)

View File

@ -16,6 +16,10 @@
// under the License.
suite("test_export_max_file_size", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
String fs = "hdfs://${nameNodeHost}:${hdfsPort}"
@ -117,10 +121,10 @@ suite("test_export_max_file_size", "p2") {
def outfile_info = waiting_export.call(uuid)
def json = parseJson(outfile_info)
assert json instanceof List
assertEquals("3", json.fileNumber[0])
def outfile_url = json.url[0]
assertEquals("3", json.fileNumber[0][0])
def outfile_url = json.url[0][0]
for (int j = 0; j < json.fileNumber[0].toInteger(); ++j ) {
for (int j = 0; j < json.fileNumber[0][0].toInteger(); ++j ) {
// check data correctness
sql """
insert into ${table_load_name}

View File

@ -16,6 +16,11 @@
// under the License.
suite("test_export_with_hdfs", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
String fs = "hdfs://${nameNodeHost}:${hdfsPort}"

View File

@ -0,0 +1,165 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_export_with_parallelism", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");
def table_export_name = "test_export_with_parallelism"
def table_load_name = "test_load_with_parallelism"
// create table and insert data
sql """ DROP TABLE IF EXISTS ${table_export_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_export_name} (
`id` int(11) NULL,
`name` string NULL,
`age` int(11) NULL
)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 100; i ++) {
sb.append("""
(${i}, 'ftw-${i}', ${i + 18}),
""")
}
sb.append("""
(${i}, NULL, NULL)
""")
sql """ INSERT INTO ${table_export_name} VALUES
${sb.toString()}
"""
qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
def waiting_export = { export_label, parallelism ->
while (true) {
def res = sql """ show export where label = "${export_label}" """
logger.info("export state: " + res[0][2])
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
// because parallelism = min(parallelism, tablets_num)
assertEquals(Integer.min(3, parallelism), json.size())
List<String> resultUrl = new ArrayList<String>();
// because parallelism = min(parallelism, tablets_num)
for (int idx = 0; idx < parallelism && idx < 3; ++idx) {
assertEquals("1", json.fileNumber[idx][0])
log.info("outfile_path: ${json.url[idx][0]}")
resultUrl.add(json.url[idx][0])
}
return resultUrl;
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed: ${res[0][10]}""")
} else {
sleep(5000)
}
}
}
def outFilePath = """${bucket}/export/exp_"""
// def outFilePath = """${bucket}/mac/test_job_scheduel/stu/exp_"""
def test_export = {format, file_suffix, isDelete, parallelism ->
def uuid = UUID.randomUUID().toString()
// exec export
sql """
EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}"
PROPERTIES(
"label" = "${uuid}",
"format" = "${format}",
"column_separator" = ",",
"parallelism" = "${parallelism}",
"delete_existing_files"="${isDelete}"
)
WITH s3 (
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
"s3.secret_key"="${sk}",
"s3.access_key" = "${ak}"
);
"""
def outfile_url_list = waiting_export.call(uuid, parallelism)
// create table and insert
sql """ DROP TABLE IF EXISTS ${table_load_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_load_name} (
`id` int(11) NULL,
`name` string NULL,
`age` int(11) NULL
)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");
"""
// because parallelism = min(parallelism, tablets_num)
for (int j = 0; j < parallelism && j < 3; ++j) {
// check data correctness
sql """ insert into ${table_load_name}
select * from s3(
"uri" = "http://${s3_endpoint}${outfile_url_list.get(j).substring(4)}0.${file_suffix}",
"s3.access_key"= "${ak}",
"s3.secret_key" = "${sk}",
"format" = "${format}",
"region" = "${region}"
);
"""
}
order_qt_select """ select * from ${table_load_name}; """
}
// parallelism = 2
test_export('csv', 'csv', true, 2);
test_export('parquet', 'parquet', true, 2);
test_export('orc', 'orc', true, 2);
test_export('csv_with_names', 'csv', true, 2);
test_export('csv_with_names_and_types', 'csv', true, 2);
// parallelism = 3
test_export('csv', 'csv', true, 3);
test_export('parquet', 'parquet', true, 3);
test_export('orc', 'orc', true, 3);
test_export('csv_with_names', 'csv', true, 3);
test_export('csv_with_names_and_types', 'csv', true, 3);
// parallelism = 4
test_export('csv', 'csv', true, 4);
test_export('parquet', 'parquet', true, 4);
test_export('orc', 'orc', true, 4);
test_export('csv_with_names', 'csv', true, 4);
test_export('csv_with_names_and_types', 'csv', true, 4);
}

View File

@ -16,6 +16,10 @@
// under the License.
suite("test_export_with_s3", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String ak = getS3AK()
String sk = getS3SK()

View File

@ -16,6 +16,11 @@
// under the License.
suite("test_outfile_orc_max_file_size", "p2") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
String fs = "hdfs://${nameNodeHost}:${hdfsPort}"