[Feature](Export) Export sql supports to export data of view and exrernal table (#24070)

Previously, EXPORT only supported the export of the olap table,
This pr supports the export of view table and external table.
This commit is contained in:
Tiewei Fang
2023-09-13 22:55:19 +08:00
committed by GitHub
parent d7e5f97b74
commit 9847f7789f
9 changed files with 2065 additions and 118 deletions

View File

@ -42,7 +42,8 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
@ -51,6 +52,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
@ -199,7 +201,7 @@ public class ExportJob implements Writable {
private List<String> exportColumns = Lists.newArrayList();
private Table exportTable;
private TableIf exportTable;
// when set to true, means this job instance is created by replay thread(FE restarted or master changed)
private boolean isReplayed = false;
@ -242,17 +244,6 @@ public class ExportJob implements Writable {
this.id = jobId;
}
/**
* For an ExportJob:
* The ExportJob is divided into multiple 'ExportTaskExecutor'
* according to the 'parallelism' set by the user.
* The tablets which will be exported by this ExportJob are divided into 'parallelism' copies,
* and each ExportTaskExecutor is responsible for a list of tablets.
* The tablets responsible for an ExportTaskExecutor will be assigned to multiple OutfileStmt
* according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
*
* @throws UserException
*/
public void generateOutfileStatement() throws UserException {
exportTable.readLock();
try {
@ -264,39 +255,35 @@ public class ExportJob implements Writable {
generateExportJobExecutor();
}
public void generateOutfileLogicalPlans(List<String> nameParts)
/**
* For an ExportJob:
* The ExportJob is divided into multiple 'ExportTaskExecutor'
* according to the 'parallelism' set by the user.
* The tablets which will be exported by this ExportJob are divided into 'parallelism' copies,
* and each ExportTaskExecutor is responsible for a list of tablets.
* The tablets responsible for an ExportTaskExecutor will be assigned to multiple OutfileStmt
* according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
*
* @throws UserException
*/
public void generateOutfileLogicalPlans(List<String> qualifiedTableName)
throws UserException {
String catalogType = Env.getCurrentEnv().getCatalogMgr().getCatalog(this.tableName.getCtl()).getType();
exportTable.readLock();
try {
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
// get all tablets
List<List<Long>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
for (List<Long> tabletsList : tabletsListPerParallel) {
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i, end));
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(nameParts, tabletIds, selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
logicalPlanAdapters.add(statementBase);
if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogType)) {
if (exportTable.getType() == TableType.VIEW) {
// view table
generateViewOrExternalTableOutfile(qualifiedTableName);
} else if (exportTable.getType() == TableType.OLAP) {
// olap table
generateOlapTableOutfile(qualifiedTableName);
} else {
throw new UserException("Do not support export table type [" + exportTable.getType() + "]");
}
selectStmtListPerParallel.add(logicalPlanAdapters);
} else {
// external table
generateViewOrExternalTableOutfile(qualifiedTableName);
}
// debug LOG output
@ -315,11 +302,77 @@ public class ExportJob implements Writable {
generateExportJobExecutor();
}
private LogicalPlan generateOneLogicalPlan(List<String> nameParts, List<Long> tabletIds,
List<NamedExpression> selectLists) {
private void generateOlapTableOutfile(List<String> qualifiedTableName) throws UserException {
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
// get all tablets
List<List<Long>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
for (List<Long> tabletsList : tabletsListPerParallel) {
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i, end));
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, tabletIds,
this.partitionNames, selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
logicalPlanAdapters.add(statementBase);
}
selectStmtListPerParallel.add(logicalPlanAdapters);
}
}
/**
* This method used to generate outfile sql for view table or external table.
* @throws UserException
*/
private void generateViewOrExternalTableOutfile(List<String> qualifiedTableName) {
// Because there is no division of tablets in view and external table
// we set parallelism = 1;
this.parallelism = 1;
LOG.debug("Because there is no division of tablets in view and external table, we set parallelism = 1");
// build source columns
List<NamedExpression> selectLists = Lists.newArrayList();
if (exportColumns.isEmpty()) {
selectLists.add(new UnboundStar(ImmutableList.of()));
} else {
this.exportColumns.stream().forEach(col -> {
selectLists.add(new UnboundSlot(this.tableName.getTbl(), col));
});
}
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, ImmutableList.of(),
ImmutableList.of(), selectLists);
// generate LogicalPlanAdapter
StatementBase statementBase = generateLogicalPlanAdapter(plan);
logicalPlanAdapters.add(statementBase);
selectStmtListPerParallel.add(logicalPlanAdapters);
}
private LogicalPlan generateOneLogicalPlan(List<String> qualifiedTableName, List<Long> tabletIds,
List<String> partitions, List<NamedExpression> selectLists) {
// UnboundRelation
LogicalPlan plan = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts,
this.partitionNames, false, tabletIds, ImmutableList.of());
LogicalPlan plan = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), qualifiedTableName,
partitions, false, tabletIds, ImmutableList.of());
// LogicalCheckPolicy
plan = new LogicalCheckPolicy<>(plan);
// LogicalFilter

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportFailMsg.CancelType;
@ -84,47 +85,49 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
// check the version of tablets
try {
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
table.readLock();
if (exportJob.getExportTable().getType() == TableType.OLAP) {
try {
List<Long> tabletIds;
if (exportJob.getSessionVariables().isEnableNereidsPlanner()) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx);
Optional<UnboundRelation> unboundRelation = findUnboundRelation(
logicalPlanAdapter.getLogicalPlan());
tabletIds = unboundRelation.get().getTabletIds();
} else {
SelectStmt selectStmt = (SelectStmt) selectStmtLists.get(idx);
tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
}
for (Long tabletId : tabletIds) {
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
tabletId);
Partition partition = table.getPartition(tabletMeta.getPartitionId());
long nowVersion = partition.getVisibleVersion();
long oldVersion = exportJob.getPartitionToVersion().get(partition.getName());
if (nowVersion != oldVersion) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed");
throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}, "
+ "now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
table.readLock();
try {
List<Long> tabletIds;
if (exportJob.getSessionVariables().isEnableNereidsPlanner()) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx);
Optional<UnboundRelation> unboundRelation = findUnboundRelation(
logicalPlanAdapter.getLogicalPlan());
tabletIds = unboundRelation.get().getTabletIds();
} else {
SelectStmt selectStmt = (SelectStmt) selectStmtLists.get(idx);
tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
}
for (Long tabletId : tabletIds) {
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
tabletId);
Partition partition = table.getPartition(tabletMeta.getPartitionId());
long nowVersion = partition.getVisibleVersion();
long oldVersion = exportJob.getPartitionToVersion().get(partition.getName());
if (nowVersion != oldVersion) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed");
throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}"
+ ", now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
}
}
} catch (Exception e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
} finally {
table.readUnlock();
}
} catch (Exception e) {
} catch (AnalysisException e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
} finally {
table.readUnlock();
}
} catch (AnalysisException e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
}
try (AutoCloseConnectContext r = buildConnectContext()) {

View File

@ -24,17 +24,19 @@ import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -119,7 +121,9 @@ public class ExportCommand extends Command implements ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// get tblName
TableName tblName = getTableName(ctx);
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, this.nameParts);
TableName tblName = new TableName(qualifiedTableName.get(0), qualifiedTableName.get(1),
qualifiedTableName.get(2));
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ctx, tblName.getDb(), tblName.getTbl(),
@ -141,9 +145,9 @@ public class ExportCommand extends Command implements ForwardWithSync {
private void checkAllParameters(ConnectContext ctx, TableName tblName, Map<String, String> fileProperties)
throws UserException {
checkPropertyKey(fileProperties);
checkPartitions(ctx.getEnv(), tblName);
checkPartitions(ctx, tblName);
checkBrokerDesc(ctx);
checkFileProperties(fileProperties, tblName);
checkFileProperties(ctx, fileProperties, tblName);
}
// check property key
@ -156,24 +160,28 @@ public class ExportCommand extends Command implements ForwardWithSync {
}
// check partitions specified by user are belonged to the table.
private void checkPartitions(Env env, TableName tblName) throws AnalysisException, UserException {
private void checkPartitions(ConnectContext ctx, TableName tblName) throws AnalysisException, UserException {
if (this.partitionsNames.isEmpty()) {
return;
}
CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
// As for external table, we do not support export PARTITION
if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog.getType())) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is EXTERNAL TABLE type, "
+ "do not support export PARTITION.");
}
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
Table table = (Table) db.getTableOrAnalysisException(tblName.getTbl());
if (this.partitionsNames.size() > Config.maximum_number_of_export_partitions) {
throw new AnalysisException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by an export job");
}
Database db = env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
Table table = db.getTableOrAnalysisException(tblName.getTbl());
table.readLock();
try {
// check table
if (!table.isPartitioned()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
Table.TableType tblType = table.getType();
switch (tblType) {
case MYSQL:
@ -181,15 +189,23 @@ public class ExportCommand extends Command implements ForwardWithSync {
case JDBC:
case OLAP:
break;
case VIEW: // We support export view, so we do not need to check partition here.
if (this.partitionsNames.size() > 0) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is VIEW type, "
+ "do not support export PARTITION.");
}
return;
case BROKER:
case SCHEMA:
case INLINE_VIEW:
case VIEW:
default:
throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
+ tblType + " type, do not support EXPORT.");
}
// check table
if (!table.isPartitioned()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
for (String partitionName : this.partitionsNames) {
Partition partition = table.getPartition(partitionName);
if (partition == null) {
@ -220,12 +236,15 @@ public class ExportCommand extends Command implements ForwardWithSync {
private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
throws UserException {
ExportJob exportJob = new ExportJob();
// set export job
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tblName.getDb());
// set export job and check catalog/db/table
CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
exportJob.setDbId(db.getId());
exportJob.setTableName(tblName);
exportJob.setExportTable(db.getTableOrDdlException(tblName.getTbl()));
exportJob.setTableId(db.getTableOrDdlException(tblName.getTbl()).getId());
exportJob.setExportTable(table);
exportJob.setTableId(table.getId());
// set partitions
exportJob.setPartitionNames(this.partitionsNames);
@ -292,23 +311,15 @@ public class ExportCommand extends Command implements ForwardWithSync {
.getQueryTimeoutS());
// exportJob generate outfile sql
exportJob.generateOutfileLogicalPlans(this.nameParts);
exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts));
return exportJob;
}
private TableName getTableName(ConnectContext ctx) throws UserException {
// get tblName
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, this.nameParts);
TableName tblName = new TableName(qualifiedTableName.get(0), qualifiedTableName.get(1),
qualifiedTableName.get(2));
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
return tblName;
}
private void checkFileProperties(Map<String, String> fileProperties, TableName tblName) throws UserException {
private void checkFileProperties(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName)
throws UserException {
// check user specified columns
if (fileProperties.containsKey(LoadStmt.KEY_IN_PARAM_COLUMNS)) {
checkColumns(fileProperties.get(LoadStmt.KEY_IN_PARAM_COLUMNS), tblName);
checkColumns(ctx, fileProperties.get(LoadStmt.KEY_IN_PARAM_COLUMNS), tblName);
}
// check user specified label
@ -317,12 +328,18 @@ public class ExportCommand extends Command implements ForwardWithSync {
}
}
private void checkColumns(String columns, TableName tblName) throws AnalysisException, UserException {
private void checkColumns(ConnectContext ctx, String columns, TableName tblName)
throws AnalysisException, UserException {
if (columns.isEmpty()) {
throw new AnalysisException("columns can not be empty");
}
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tblName.getDb());
Table table = db.getTableOrDdlException(tblName.getTbl());
CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
// As for external table
// their base schemas are equals to full schemas
List<String> tableColumns = table.getBaseSchema().stream().map(column -> column.getName())
.collect(Collectors.toList());
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
@ -339,6 +356,10 @@ public class ExportCommand extends Command implements ForwardWithSync {
return this.fileProperties;
}
public List<String> getNameParts() {
return this.nameParts;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitExportCommand(this, context);

View File

@ -1958,7 +1958,7 @@ public class ShowExecutor {
private void handleShowExport() throws AnalysisException {
ShowExportStmt showExportStmt = (ShowExportStmt) stmt;
Env env = Env.getCurrentEnv();
Database db = env.getInternalCatalog().getDbOrAnalysisException(showExportStmt.getDbName());
DatabaseIf db = env.getCurrentCatalog().getDbOrAnalysisException(showExportStmt.getDbName());
long dbId = db.getId();
ExportMgr exportMgr = env.getExportMgr();

View File

@ -25,6 +25,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
@ -412,9 +413,6 @@ public class ExportToOutfileLogicalPlanTest extends TestWithFeService {
ExportCommand exportCommand = (ExportCommand) parseSql(exportSql);
List<List<StatementBase>> selectStmtListPerParallel = Lists.newArrayList();
try {
Method getTableName = exportCommand.getClass().getDeclaredMethod("getTableName", ConnectContext.class);
getTableName.setAccessible(true);
Method checkAllParameters = exportCommand.getClass().getDeclaredMethod("checkAllParameters",
ConnectContext.class, TableName.class, Map.class);
checkAllParameters.setAccessible(true);
@ -423,7 +421,10 @@ public class ExportToOutfileLogicalPlanTest extends TestWithFeService {
ConnectContext.class, Map.class, TableName.class);
generateExportJob.setAccessible(true);
TableName tblName = (TableName) getTableName.invoke(exportCommand, connectContext);
// get tblName
List<String> qualifiedTableName = RelationUtil.getQualifierName(connectContext, exportCommand.getNameParts());
TableName tblName = new TableName(qualifiedTableName.get(0), qualifiedTableName.get(1),
qualifiedTableName.get(2));
checkAllParameters.invoke(exportCommand, connectContext, tblName, exportCommand.getFileProperties());
ExportJob job = (ExportJob) generateExportJob.invoke(