[improve](export) Support partition data consistency (#31290)

This commit is contained in:
walter
2024-02-29 22:08:29 +08:00
committed by yiguolei
parent 82faa7469b
commit 1f825ee2d6
7 changed files with 611 additions and 25 deletions

View File

@ -63,6 +63,7 @@ import java.util.stream.Collectors;
public class ExportStmt extends StatementBase {
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
public static final String DATA_CONSISTENCY = "data_consistency";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
@ -72,6 +73,7 @@ public class ExportStmt extends StatementBase {
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
.add(DATA_CONSISTENCY)
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@ -104,6 +106,7 @@ public class ExportStmt extends StatementBase {
private String maxFileSize;
private String deleteExistingFiles;
private String withBom;
private String dataConsistency;
private SessionVariable sessionVariables;
private String qualifiedUser;
@ -230,6 +233,7 @@ public class ExportStmt extends StatementBase {
exportJob.setMaxFileSize(this.maxFileSize);
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
exportJob.setWithBom(this.withBom);
exportJob.setDataConsistency(this.dataConsistency);
if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
@ -359,6 +363,17 @@ public class ExportStmt extends StatementBase {
// with bom
this.withBom = properties.getOrDefault(OutFileClause.PROP_WITH_BOM, "false");
// data consistency
String dataConsistencyStr = properties.get(DATA_CONSISTENCY);
if (dataConsistencyStr != null) {
if (!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
throw new UserException("The value of data_consistency is invalid, only `partition` is allowed");
}
this.dataConsistency = ExportJob.CONSISTENT_PARTITION;
} else {
this.dataConsistency = ExportJob.CONSISTENT_ALL;
}
}
private void checkColumns() throws DdlException {

View File

@ -99,6 +99,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Data
public class ExportJob implements Writable {
@ -108,6 +109,9 @@ public class ExportJob implements Writable {
private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = Config.maximum_tablets_of_outfile_in_export;
public static final String CONSISTENT_ALL = "all";
public static final String CONSISTENT_PARTITION = "partition";
@SerializedName("id")
private long id;
@SerializedName("label")
@ -168,6 +172,8 @@ public class ExportJob implements Writable {
private Integer tabletsNum;
@SerializedName("withBom")
private String withBom;
@SerializedName("dataConsistency")
private String dataConsistency;
private TableRef tableRef;
@ -222,6 +228,7 @@ public class ExportJob implements Writable {
this.lineDelimiter = "\n";
this.columns = "";
this.withBom = "false";
this.dataConsistency = "all";
}
public ExportJob(long jobId) {
@ -229,6 +236,10 @@ public class ExportJob implements Writable {
this.id = jobId;
}
public boolean isPartitionConsistency() {
return dataConsistency != null && dataConsistency.equals(CONSISTENT_PARTITION);
}
public void generateOutfileStatement() throws UserException {
exportTable.readLock();
try {
@ -302,16 +313,12 @@ public class ExportJob implements Writable {
}
// get all tablets
List<List<Long>> tabletsListPerParallel = splitTablets();
List<List<List<Long>>> tabletsListPerParallel = splitTablets();
// Each Outfile clause responsible for MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT tablets
for (List<Long> tabletsList : tabletsListPerParallel) {
for (List<List<Long>> tabletsList : tabletsListPerParallel) {
List<StatementBase> logicalPlanAdapters = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
List<Long> tabletIds = new ArrayList<>(tabletsList.subList(i, end));
for (List<Long> tabletIds : tabletsList) {
// generate LogicalPlan
LogicalPlan plan = generateOneLogicalPlan(qualifiedTableName, tabletIds,
this.partitionNames, selectLists);
@ -471,15 +478,12 @@ public class ExportJob implements Writable {
}
private List<List<TableRef>> getTableRefListPerParallel() throws UserException {
List<List<Long>> tabletsListPerParallel = splitTablets();
List<List<List<Long>>> tabletsListPerParallel = splitTablets();
List<List<TableRef>> tableRefListPerParallel = Lists.newArrayList();
for (List<Long> tabletsList : tabletsListPerParallel) {
for (List<List<Long>> tabletsList : tabletsListPerParallel) {
List<TableRef> tableRefList = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
List<Long> tablets = new ArrayList<>(tabletsList.subList(i, end));
for (List<Long> tablets : tabletsList) {
// Since export does not support the alias, here we pass the null value.
// we can not use this.tableRef.getAlias(),
// because the constructor of `Tableref` will convert this.tableRef.getAlias()
@ -494,11 +498,13 @@ public class ExportJob implements Writable {
return tableRefListPerParallel;
}
private List<List<Long>> splitTablets() throws UserException {
private List<List<List<Long>>> splitTablets() throws UserException {
// get tablets
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl());
List<Long> tabletIdList = Lists.newArrayList();
Integer tabletsAllNum = 0;
List<List<Long>> tabletIdList = Lists.newArrayList();
table.readLock();
try {
final Collection<Partition> partitions = new ArrayList<Partition>();
@ -516,26 +522,56 @@ public class ExportJob implements Writable {
// get tablets
for (Partition partition : partitions) {
partitionToVersion.put(partition.getName(), partition.getVisibleVersion());
// Partition data consistency is not need to verify partition version.
if (!isPartitionConsistency()) {
partitionToVersion.put(partition.getName(), partition.getVisibleVersion());
}
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
tabletIdList.addAll(index.getTabletIdsInOrder());
List<Long> tablets = index.getTabletIdsInOrder();
tabletsAllNum += tablets.size();
tabletIdList.add(tablets);
}
}
} finally {
table.readUnlock();
}
if (isPartitionConsistency()) {
// Assign tablets of a partition to per parallel.
int totalPartitions = tabletIdList.size();
int numPerParallel = totalPartitions / this.parallelism;
int numPerQueryRemainder = totalPartitions - numPerParallel * this.parallelism;
int realParallelism = this.parallelism;
if (totalPartitions < this.parallelism) {
realParallelism = totalPartitions;
LOG.warn("Export Job [{}]: The number of partitions ({}) is smaller than parallelism ({}), "
+ "set parallelism to partition num.", id, totalPartitions, this.parallelism);
}
int start = 0;
List<List<List<Long>>> tabletsListPerParallel = new ArrayList<>();
for (int i = 0; i < realParallelism; ++i) {
int partitionNum = numPerParallel;
if (numPerQueryRemainder > 0) {
partitionNum += 1;
--numPerQueryRemainder;
}
List<List<Long>> tablets = new ArrayList<>(tabletIdList.subList(start, start + partitionNum));
start += partitionNum;
tabletsListPerParallel.add(tablets);
}
return tabletsListPerParallel;
}
/**
* Assign tablets to per parallel, for example:
* If the number of all tablets if 10, and the real parallelism is 4,
* then, the number of tablets of per parallel should be: 3 3 2 2.
*/
Integer tabletsAllNum = tabletIdList.size();
tabletsNum = tabletsAllNum;
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism;
List<List<Long>> tabletsListPerParallel = Lists.newArrayList();
List<List<List<Long>>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
@ -543,15 +579,22 @@ public class ExportJob implements Writable {
+ "set parallelism to tablets num.", id, tabletsAllNum, this.parallelism);
}
Integer start = 0;
for (int i = 0; i < realParallelism; ++i) {
List<Long> flatTabletIdList = tabletIdList.stream().flatMap(List::stream).collect(Collectors.toList());
for (int j = 0; j < realParallelism; ++j) {
Integer tabletsNum = tabletsNumPerParallel;
if (tabletsNumPerQueryRemainder > 0) {
tabletsNum = tabletsNum + 1;
--tabletsNumPerQueryRemainder;
}
ArrayList<Long> tablets = new ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
start += tabletsNum;
List<Long> tabletsList = new ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
List<List<Long>> tablets = new ArrayList<>();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
}
start += tabletsNum;
tabletsListPerParallel.add(tablets);
}
return tabletsListPerParallel;

View File

@ -355,6 +355,7 @@ public class ExportMgr {
infoMap.put("tablet_num", job.getTabletsNum());
infoMap.put("max_file_size", job.getMaxFileSize());
infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
infoMap.put("data_consistency", job.getDataConsistency());
jobInfo.add(new Gson().toJson(infoMap));
// path
jobInfo.add(job.getExportPath());

View File

@ -89,8 +89,8 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
if (isCanceled.get()) {
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
// check the version of tablets
if (exportJob.getExportTable().getType() == TableType.OLAP) {
// check the version of tablets, skip if the consistency is in partition level.
if (exportJob.getExportTable().getType() == TableType.OLAP && !exportJob.isPartitionConsistency()) {
try {
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
@ -136,7 +136,6 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
}
try (AutoCloseConnectContext r = buildConnectContext()) {
StatementBase statementBase = selectStmtLists.get(idx);
OriginStatement originStatement = new OriginStatement(
StringUtils.isEmpty(statementBase.getOrigStmt().originStmt)

View File

@ -73,6 +73,7 @@ import java.util.stream.Collectors;
public class ExportCommand extends Command implements ForwardWithSync {
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
public static final String DATA_CONSISTENCY = "data_consistency";
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
@ -81,6 +82,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
.add(DATA_CONSISTENCY)
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
@ -310,6 +312,17 @@ public class ExportCommand extends Command implements ForwardWithSync {
exportJob.setQualifiedUser(ctx.getQualifiedUser());
exportJob.setUserIdentity(ctx.getCurrentUserIdentity());
// set data consistency
String dataConsistencyStr = fileProperties.get(DATA_CONSISTENCY);
if (dataConsistencyStr != null) {
if (!dataConsistencyStr.equalsIgnoreCase(ExportJob.CONSISTENT_PARTITION)) {
throw new AnalysisException("The value of data_consistency is invalid, only partition is allowed!");
}
exportJob.setDataConsistency(ExportJob.CONSISTENT_PARTITION);
} else {
exportJob.setDataConsistency(ExportJob.CONSISTENT_ALL);
}
// Must copy session variable, because session variable may be changed during export job running.
SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable(
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));