[feature](iceberg) iceberg write support insert overwrite and optimize hive write transaction statistics and (#37191) (#38097)

bp #37191

Co-authored-by: kang <35803862+ghkang98@users.noreply.github.com>
Co-authored-by: lik40 <lik40@chinatelecom.cn>
This commit is contained in:
Mingyu Chen
2024-07-19 09:45:41 +08:00
committed by GitHub
parent 4d03e288c2
commit bb2b7774df
14 changed files with 1359 additions and 417 deletions

View File

@ -24,7 +24,9 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemUtil;
@ -82,15 +84,13 @@ public class HMSTransaction implements Transaction {
private static final Logger LOG = LogManager.getLogger(HMSTransaction.class);
private final HiveMetadataOps hiveOps;
private final FileSystem fs;
private String dbName;
private String tbName;
private Optional<SummaryProfile> summaryProfile = Optional.empty();
private String queryId;
private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>>
private final Map<SimpleTableInfo, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>>
partitionActions = new HashMap<>();
private final Map<DatabaseTableName, List<FieldSchema>> tableColumns = new HashMap<>();
private final Map<SimpleTableInfo, List<FieldSchema>> tableColumns = new HashMap<>();
private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
@ -145,14 +145,6 @@ public class HMSTransaction implements Transaction {
doCommit();
}
public String getDbName() {
return dbName;
}
public String getTbName() {
return tbName;
}
public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) {
Map<String, THivePartitionUpdate> mm = new HashMap<>();
for (THivePartitionUpdate pu : hivePUs) {
@ -183,9 +175,7 @@ public class HMSTransaction implements Transaction {
queryId = ctx.getQueryId();
}
public void finishInsertTable(String dbName, String tbName) {
this.dbName = dbName;
this.tbName = tbName;
public void finishInsertTable(SimpleTableInfo tableInfo) {
List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates);
for (THivePartitionUpdate pu : mergedPUs) {
if (pu.getS3MpuPendingUploads() != null) {
@ -195,7 +185,7 @@ public class HMSTransaction implements Transaction {
}
}
}
Table table = getTable(dbName, tbName);
Table table = getTable(tableInfo);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
TUpdateMode updateMode = pu.getUpdateMode();
@ -211,16 +201,15 @@ public class HMSTransaction implements Transaction {
case APPEND:
finishChangingExistingTable(
ActionType.INSERT_EXISTING,
dbName,
tbName,
tableInfo,
writePath,
pu.getFileNames(),
hivePartitionStatistics,
pu);
break;
case OVERWRITE:
dropTable(dbName, tbName);
createTable(table, writePath, pu.getFileNames(), hivePartitionStatistics, pu);
dropTable(tableInfo);
createTable(tableInfo, table, writePath, pu.getFileNames(), hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
@ -235,8 +224,7 @@ public class HMSTransaction implements Transaction {
case OVERWRITE:
StorageDescriptor sd = table.getSd();
HivePartition hivePartition = new HivePartition(
dbName,
tbName,
tableInfo,
false,
sd.getInputFormat(),
pu.getLocation().getTargetPath(),
@ -244,13 +232,13 @@ public class HMSTransaction implements Transaction {
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
getTableColumns(dbName, tbName)
getTableColumns(tableInfo)
);
if (updateMode == TUpdateMode.OVERWRITE) {
dropPartition(dbName, tbName, hivePartition.getPartitionValues(), true);
dropPartition(tableInfo, hivePartition.getPartitionValues(), true);
}
addPartition(
dbName, tbName, hivePartition, writePath,
tableInfo, hivePartition, writePath,
pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
break;
default:
@ -260,7 +248,7 @@ public class HMSTransaction implements Transaction {
}
if (!insertExistsPartitions.isEmpty()) {
convertToInsertExistingPartitionAction(insertExistsPartitions);
convertToInsertExistingPartitionAction(tableInfo, insertExistsPartitions);
}
}
@ -268,45 +256,47 @@ public class HMSTransaction implements Transaction {
hmsCommitter = new HmsCommitter();
try {
for (Map.Entry<DatabaseTableName, Action<TableAndMore>> entry : tableActions.entrySet()) {
for (Map.Entry<SimpleTableInfo, Action<TableAndMore>> entry : tableActions.entrySet()) {
SimpleTableInfo tableInfo = entry.getKey();
Action<TableAndMore> action = entry.getValue();
switch (action.getType()) {
case INSERT_EXISTING:
hmsCommitter.prepareInsertExistingTable(action.getData());
hmsCommitter.prepareInsertExistingTable(tableInfo, action.getData());
break;
case ALTER:
hmsCommitter.prepareAlterTable(action.getData());
hmsCommitter.prepareAlterTable(tableInfo, action.getData());
break;
default:
throw new UnsupportedOperationException("Unsupported table action type: " + action.getType());
}
}
for (Map.Entry<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>> tableEntry
for (Map.Entry<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>> tableEntry
: partitionActions.entrySet()) {
SimpleTableInfo tableInfo = tableEntry.getKey();
for (Map.Entry<List<String>, Action<PartitionAndMore>> partitionEntry :
tableEntry.getValue().entrySet()) {
Action<PartitionAndMore> action = partitionEntry.getValue();
switch (action.getType()) {
case INSERT_EXISTING:
hmsCommitter.prepareInsertExistPartition(action.getData());
hmsCommitter.prepareInsertExistPartition(tableInfo, action.getData());
break;
case ADD:
hmsCommitter.prepareAddPartition(action.getData());
hmsCommitter.prepareAddPartition(tableInfo, action.getData());
break;
case ALTER:
hmsCommitter.prepareAlterPartition(action.getData());
hmsCommitter.prepareAlterPartition(tableInfo, action.getData());
break;
default:
throw new UnsupportedOperationException(
"Unsupported partition action type: " + action.getType());
"Unsupported partition action type: " + action.getType());
}
}
}
hmsCommitter.doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
LOG.warn("Failed to commit for {}, abort it.", queryId);
try {
hmsCommitter.abort();
hmsCommitter.rollback();
@ -336,10 +326,10 @@ public class HMSTransaction implements Transaction {
}
private void convertToInsertExistingPartitionAction(
SimpleTableInfo tableInfo,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) {
DatabaseTableName databaseTableName = new DatabaseTableName(dbName, tbName);
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(databaseTableName, k -> new HashMap<>());
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch :
Iterables.partition(partitions, 100)) {
@ -357,7 +347,7 @@ public class HMSTransaction implements Transaction {
case DROP_PRESERVE_DATA:
throw new RuntimeException(
"Not found partition from partition actions"
+ "for " + databaseTableName + ", partitions: " + partitionNames);
+ "for " + tableInfo + ", partitions: " + partitionNames);
case ADD:
case ALTER:
case INSERT_EXISTING:
@ -372,7 +362,7 @@ public class HMSTransaction implements Transaction {
Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap(
partitionNames,
hiveOps.getClient().getPartitions(dbName, tbName, partitionNames));
hiveOps.getClient().getPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitionNames));
for (int i = 0; i < partitionsByNamesMap.size(); i++) {
String partitionName = partitionNames.get(i);
@ -381,7 +371,7 @@ public class HMSTransaction implements Transaction {
if (partition == null) {
// Prevent this partition from being deleted by other engines
throw new RuntimeException(
"Not found partition from hms for " + databaseTableName
"Not found partition from hms for " + tableInfo
+ ", partitions: " + partitionNames);
}
THivePartitionUpdate pu = partitionBatch.get(i).first;
@ -391,8 +381,7 @@ public class HMSTransaction implements Transaction {
List<String> partitionValues = HiveUtil.toPartitionValues(pu.getName());
HivePartition hivePartition = new HivePartition(
dbName,
tbName,
tableInfo,
false,
sd.getInputFormat(),
partition.getSd().getLocation(),
@ -400,7 +389,7 @@ public class HMSTransaction implements Transaction {
partition.getParameters(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
getTableColumns(dbName, tbName)
getTableColumns(tableInfo)
);
partitionActionsForTable.put(
@ -408,12 +397,12 @@ public class HMSTransaction implements Transaction {
new Action<>(
ActionType.INSERT_EXISTING,
new PartitionAndMore(
hivePartition,
pu.getLocation().getWritePath(),
pu.getName(),
pu.getFileNames(),
updateStats,
pu
hivePartition,
pu.getLocation().getWritePath(),
pu.getName(),
pu.getFileNames(),
updateStats,
pu
))
);
}
@ -433,18 +422,16 @@ public class HMSTransaction implements Transaction {
}
public static class UpdateStatisticsTask {
private final String dbName;
private final String tableName;
private final SimpleTableInfo tableInfo;
private final Optional<String> partitionName;
private final HivePartitionStatistics updatePartitionStat;
private final boolean merge;
private boolean done;
public UpdateStatisticsTask(String dbName, String tableName, Optional<String> partitionName,
HivePartitionStatistics statistics, boolean merge) {
this.dbName = Objects.requireNonNull(dbName, "dbName is null");
this.tableName = Objects.requireNonNull(tableName, "tableName is null");
public UpdateStatisticsTask(SimpleTableInfo tableInfo, Optional<String> partitionName,
HivePartitionStatistics statistics, boolean merge) {
this.tableInfo = Objects.requireNonNull(tableInfo, "tableInfo is null");
this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null");
this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null");
this.merge = merge;
@ -452,9 +439,9 @@ public class HMSTransaction implements Transaction {
public void run(HiveMetadataOps hiveOps) {
if (partitionName.isPresent()) {
hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics);
hiveOps.updatePartitionStatistics(tableInfo, partitionName.get(), this::updateStatistics);
} else {
hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics);
hiveOps.updateTableStatistics(tableInfo, this::updateStatistics);
}
done = true;
}
@ -464,17 +451,17 @@ public class HMSTransaction implements Transaction {
return;
}
if (partitionName.isPresent()) {
hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics);
hmsOps.updatePartitionStatistics(tableInfo, partitionName.get(), this::resetStatistics);
} else {
hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics);
hmsOps.updateTableStatistics(tableInfo, this::resetStatistics);
}
}
public String getDescription() {
if (partitionName.isPresent()) {
return "alter partition parameters " + tableName + " " + partitionName.get();
return "alter partition parameters " + tableInfo + " " + partitionName.get();
} else {
return "alter table parameters " + tableName;
return "alter table parameters " + tableInfo;
}
}
@ -484,7 +471,7 @@ public class HMSTransaction implements Transaction {
private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) {
return HivePartitionStatistics
.reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT);
.reduce(currentStatistics, updatePartitionStat, CommonStatistics.ReduceOperator.SUBTRACT);
}
}
@ -506,12 +493,11 @@ public class HMSTransaction implements Transaction {
public void run(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<HivePartitionWithStatistics>> batchedPartitions = Lists.partition(partitions, 20);
for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
try {
hiveOps.addPartitions(dbName, tableName, batch);
hiveOps.addPartitions(tableInfo, batch);
for (HivePartitionWithStatistics partition : batch) {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
@ -524,15 +510,14 @@ public class HMSTransaction implements Transaction {
public List<List<String>> rollback(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<String>> rollbackFailedPartitions = new ArrayList<>();
for (List<String> createdPartitionValue : createdPartitionValues) {
try {
hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false);
hiveOps.dropPartition(tableInfo, createdPartitionValue, false);
} catch (Throwable t) {
LOG.warn("Failed to drop partition on {}.{}.{} when rollback",
dbName, tableName, rollbackFailedPartitions);
LOG.warn("Failed to drop partition on {}.{} when rollback",
tableInfo, rollbackFailedPartitions);
rollbackFailedPartitions.add(createdPartitionValue);
}
}
@ -560,9 +545,9 @@ public class HMSTransaction implements Transaction {
@Override
public String toString() {
return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
.add("path=" + path)
.add("deleteEmptyDir=" + deleteEmptyDir)
.toString();
.add("path=" + path)
.add("deleteEmptyDir=" + deleteEmptyDir)
.toString();
}
}
@ -604,14 +589,13 @@ public class HMSTransaction implements Transaction {
@Override
public String toString() {
return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]")
.add("renameFrom:" + renameFrom)
.add("renameTo:" + renameTo)
.toString();
.add("renameFrom:" + renameFrom)
.add("renameTo:" + renameTo)
.toString();
}
}
private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir, boolean reverse) {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir, reverse);
@ -699,48 +683,6 @@ public class HMSTransaction implements Transaction {
return !fs.directoryExists(path.toString()).ok();
}
public static class DatabaseTableName {
private final String dbName;
private final String tbName;
public DatabaseTableName(String dbName, String tbName) {
this.dbName = dbName;
this.tbName = tbName;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DatabaseTableName that = (DatabaseTableName) other;
return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName);
}
@Override
public String toString() {
return dbName + "." + tbName;
}
@Override
public int hashCode() {
return Objects.hash(dbName, tbName);
}
public String getTbName() {
return tbName;
}
public String getDbName() {
return dbName;
}
}
private static class TableAndMore {
private final Table table;
private final String currentLocation;
@ -785,9 +727,9 @@ public class HMSTransaction implements Transaction {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("table", table)
.add("statisticsUpdate", statisticsUpdate)
.toString();
.add("table", table)
.add("statisticsUpdate", statisticsUpdate)
.toString();
}
}
@ -843,10 +785,10 @@ public class HMSTransaction implements Transaction {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("partition", partition)
.add("currentLocation", currentLocation)
.add("fileNames", fileNames)
.toString();
.add("partition", partition)
.add("currentLocation", currentLocation)
.add("fileNames", fileNames)
.toString();
}
}
@ -891,16 +833,16 @@ public class HMSTransaction implements Transaction {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("data", data)
.toString();
.add("type", type)
.add("data", data)
.toString();
}
}
public synchronized Table getTable(String databaseName, String tableName) {
Action<TableAndMore> tableAction = tableActions.get(new DatabaseTableName(databaseName, tableName));
public synchronized Table getTable(SimpleTableInfo tableInfo) {
Action<TableAndMore> tableAction = tableActions.get(tableInfo);
if (tableAction == null) {
return hiveOps.getClient().getTable(databaseName, tableName);
return hiveOps.getClient().getTable(tableInfo.getDbName(), tableInfo.getTbName());
}
switch (tableAction.getType()) {
case ADD:
@ -914,28 +856,26 @@ public class HMSTransaction implements Transaction {
default:
throw new IllegalStateException("Unknown action type: " + tableAction.getType());
}
throw new RuntimeException("Not Found table: " + databaseName + "." + tableName);
throw new RuntimeException("Not Found table: " + tableInfo);
}
public synchronized List<FieldSchema> getTableColumns(String databaseName, String tableName) {
return tableColumns.computeIfAbsent(new DatabaseTableName(databaseName, tableName),
key -> hiveOps.getClient().getSchema(dbName, tbName));
public synchronized List<FieldSchema> getTableColumns(SimpleTableInfo tableInfo) {
return tableColumns.computeIfAbsent(tableInfo,
key -> hiveOps.getClient().getSchema(tableInfo.getDbName(), tableInfo.getTbName()));
}
public synchronized void finishChangingExistingTable(
ActionType actionType,
String databaseName,
String tableName,
SimpleTableInfo tableInfo,
String location,
List<String> fileNames,
HivePartitionStatistics statisticsUpdate,
THivePartitionUpdate hivePartitionUpdate) {
DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName);
Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName);
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
if (oldTableAction == null) {
Table table = hiveOps.getClient().getTable(databaseTableName.getDbName(), databaseTableName.getTbName());
Table table = hiveOps.getClient().getTable(tableInfo.getDbName(), tableInfo.getTbName());
tableActions.put(
databaseTableName,
tableInfo,
new Action<>(
actionType,
new TableAndMore(
@ -949,7 +889,7 @@ public class HMSTransaction implements Transaction {
switch (oldTableAction.getType()) {
case DROP:
throw new RuntimeException("Not found table: " + databaseTableName);
throw new RuntimeException("Not found table: " + tableInfo);
case ADD:
case ALTER:
case INSERT_EXISTING:
@ -965,27 +905,28 @@ public class HMSTransaction implements Transaction {
}
public synchronized void createTable(
Table table, String location, List<String> fileNames, HivePartitionStatistics statistics,
SimpleTableInfo tableInfo,
Table table, String location, List<String> fileNames,
HivePartitionStatistics statistics,
THivePartitionUpdate hivePartitionUpdate) {
// When creating a table, it should never have partition actions. This is just a sanity check.
checkNoPartitionAction(dbName, tbName);
DatabaseTableName databaseTableName = new DatabaseTableName(dbName, tbName);
Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName);
checkNoPartitionAction(tableInfo);
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
TableAndMore tableAndMore = new TableAndMore(table, location, fileNames, statistics, hivePartitionUpdate);
if (oldTableAction == null) {
tableActions.put(databaseTableName, new Action<>(ActionType.ADD, tableAndMore));
tableActions.put(tableInfo, new Action<>(ActionType.ADD, tableAndMore));
return;
}
switch (oldTableAction.getType()) {
case DROP:
tableActions.put(databaseTableName, new Action<>(ActionType.ALTER, tableAndMore));
tableActions.put(tableInfo, new Action<>(ActionType.ALTER, tableAndMore));
return;
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException("Table already exists: " + databaseTableName);
throw new RuntimeException("Table already exists: " + tableInfo);
case DROP_PRESERVE_DATA:
break;
default:
@ -994,18 +935,17 @@ public class HMSTransaction implements Transaction {
}
public synchronized void dropTable(String databaseName, String tableName) {
public synchronized void dropTable(SimpleTableInfo tableInfo) {
// Dropping table with partition actions requires cleaning up staging data, which is not implemented yet.
checkNoPartitionAction(databaseName, tableName);
DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName);
Action<TableAndMore> oldTableAction = tableActions.get(databaseTableName);
checkNoPartitionAction(tableInfo);
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) {
tableActions.put(databaseTableName, new Action<>(ActionType.DROP, null));
tableActions.put(tableInfo, new Action<>(ActionType.DROP, null));
return;
}
switch (oldTableAction.getType()) {
case DROP:
throw new RuntimeException("Not found table: " + databaseTableName);
throw new RuntimeException("Not found table: " + tableInfo);
case ADD:
case ALTER:
case INSERT_EXISTING:
@ -1019,9 +959,9 @@ public class HMSTransaction implements Transaction {
}
private void checkNoPartitionAction(String databaseName, String tableName) {
private void checkNoPartitionAction(SimpleTableInfo tableInfo) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.get(new DatabaseTableName(databaseName, tableName));
partitionActions.get(tableInfo);
if (partitionActionsForTable != null && !partitionActionsForTable.isEmpty()) {
throw new RuntimeException(
"Cannot make schema changes to a table with modified partitions in the same transaction");
@ -1029,8 +969,7 @@ public class HMSTransaction implements Transaction {
}
public synchronized void addPartition(
String databaseName,
String tableName,
SimpleTableInfo tableInfo,
HivePartition partition,
String currentLocation,
String partitionName,
@ -1038,7 +977,7 @@ public class HMSTransaction implements Transaction {
HivePartitionStatistics statistics,
THivePartitionUpdate hivePartitionUpdate) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(new DatabaseTableName(databaseName, tableName), k -> new HashMap<>());
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partition.getPartitionValues());
if (oldPartitionAction == null) {
partitionActionsForTable.put(
@ -1066,21 +1005,20 @@ public class HMSTransaction implements Transaction {
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException(
"Partition already exists for table: "
+ databaseName + "." + tableName + ", partition values: " + partition.getPartitionValues());
"Partition already exists for table: "
+ tableInfo + ", partition values: " + partition
.getPartitionValues());
default:
throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType());
}
}
public synchronized void dropPartition(
String databaseName,
String tableName,
SimpleTableInfo tableInfo,
List<String> partitionValues,
boolean deleteData) {
DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName);
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(databaseTableName, k -> new HashMap<>());
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partitionValues);
if (oldPartitionAction == null) {
if (deleteData) {
@ -1094,7 +1032,7 @@ public class HMSTransaction implements Transaction {
case DROP:
case DROP_PRESERVE_DATA:
throw new RuntimeException(
"Not found partition from partition actions for " + databaseTableName
"Not found partition from partition actions for " + tableInfo
+ ", partitions: " + partitionValues);
case ADD:
case ALTER:
@ -1102,7 +1040,7 @@ public class HMSTransaction implements Transaction {
case MERGE:
throw new RuntimeException(
"Dropping a partition added in the same transaction is not supported: "
+ databaseTableName + ", partition values: " + partitionValues);
+ tableInfo + ", partition values: " + partitionValues);
default:
throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType());
}
@ -1158,12 +1096,11 @@ public class HMSTransaction implements Transaction {
}
HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps);
if (!rollbackFailedPartitions.isEmpty()) {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}",
dbName, tableName, rollbackFailedPartitions);
LOG.warn("Failed to rollback: add_partition for partition values {}.{}",
tableInfo, rollbackFailedPartitions);
}
}
@ -1179,7 +1116,7 @@ public class HMSTransaction implements Transaction {
}
}
public void prepareInsertExistingTable(TableAndMore tableAndMore) {
public void prepareInsertExistingTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Table table = tableAndMore.getTable();
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
@ -1200,15 +1137,14 @@ public class HMSTransaction implements Transaction {
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
updateStatisticsTasks.add(
new UpdateStatisticsTask(
dbName,
tbName,
tableInfo,
Optional.empty(),
tableAndMore.getStatisticsUpdate(),
true
));
}
public void prepareAlterTable(TableAndMore tableAndMore) {
public void prepareAlterTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Table table = tableAndMore.getTable();
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
@ -1222,18 +1158,18 @@ public class HMSTransaction implements Transaction {
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);
status = wrapperRenameDirWithProfileSummary(
status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(
new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
} else {
if (!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
@ -1243,16 +1179,15 @@ public class HMSTransaction implements Transaction {
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
dbName,
tbName,
Optional.empty(),
tableAndMore.getStatisticsUpdate(),
false
));
new UpdateStatisticsTask(
tableInfo,
Optional.empty(),
tableAndMore.getStatisticsUpdate(),
false
));
}
public void prepareAddPartition(PartitionAndMore partitionAndMore) {
public void prepareAddPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
@ -1273,11 +1208,10 @@ public class HMSTransaction implements Transaction {
}
}
StorageDescriptor sd = getTable(dbName, tbName).getSd();
StorageDescriptor sd = getTable(tableInfo).getSd();
HivePartition hivePartition = new HivePartition(
dbName,
tbName,
tableInfo,
false,
sd.getInputFormat(),
targetPath,
@ -1285,7 +1219,7 @@ public class HMSTransaction implements Transaction {
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
getTableColumns(dbName, tbName)
getTableColumns(tableInfo)
);
HivePartitionWithStatistics partitionWithStats =
@ -1296,7 +1230,7 @@ public class HMSTransaction implements Transaction {
addPartitionsTask.addPartition(partitionWithStats);
}
public void prepareInsertExistPartition(PartitionAndMore partitionAndMore) {
public void prepareInsertExistPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
@ -1319,12 +1253,11 @@ public class HMSTransaction implements Transaction {
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
dbName,
tbName,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
true));
new UpdateStatisticsTask(
tableInfo,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
true));
}
private void runDirectoryClearUpTasksForAbort() {
@ -1338,7 +1271,8 @@ public class HMSTransaction implements Transaction {
for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
status = fs.exists(task.getRenameFrom());
if (status.ok()) {
status = wrapperRenameDirWithProfileSummary(task.getRenameFrom(), task.getRenameTo(), () -> {});
status = wrapperRenameDirWithProfileSummary(task.getRenameFrom(), task.getRenameTo(), () -> {
});
if (!status.ok()) {
LOG.warn("Failed to abort rename dir from {} to {}:{}",
task.getRenameFrom(), task.getRenameTo(), status.getErrMsg());
@ -1363,7 +1297,7 @@ public class HMSTransaction implements Transaction {
}
}
public void prepareAlterPartition(PartitionAndMore partitionAndMore) {
public void prepareAlterPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
String writePath = partitionAndMore.getCurrentLocation();
@ -1378,19 +1312,19 @@ public class HMSTransaction implements Transaction {
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir "
+ "from " + targetPath
+ " to " + oldPartitionPath + ":" + status.getErrMsg());
"Error to rename dir "
+ "from " + targetPath
+ " to " + oldPartitionPath + ":" + status.getErrMsg());
}
clearDirsForFinish.add(oldPartitionPath);
status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
} else {
if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
@ -1401,13 +1335,12 @@ public class HMSTransaction implements Transaction {
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
dbName,
tbName,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
false
));
new UpdateStatisticsTask(
tableInfo,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
false
));
}
@ -1538,8 +1471,8 @@ public class HMSTransaction implements Transaction {
}
public Status wrapperRenameDirWithProfileSummary(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
String destFilePath,
Runnable runWhenPathNotExist) {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.incRenameDirCnt();
@ -1576,22 +1509,22 @@ public class HMSTransaction implements Transaction {
}
public void wrapperAsyncRenameWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
FileSystemUtil.asyncRenameFiles(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames);
summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size()));
}
public void wrapperAsyncRenameDirWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
FileSystemUtil.asyncRenameDir(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist);
summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);

View File

@ -32,6 +32,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@ -292,25 +293,23 @@ public class HiveMetadataOps implements ExternalMetadataOps {
}
public void updateTableStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updateTableStatistics(dbName, tableName, update);
client.updateTableStatistics(tableInfo.getDbName(), tableInfo.getTbName(), update);
}
void updatePartitionStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updatePartitionStatistics(dbName, tableName, partitionName, update);
client.updatePartitionStatistics(tableInfo.getDbName(), tableInfo.getTbName(), partitionName, update);
}
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(dbName, tableName, partitions);
public void addPartitions(SimpleTableInfo tableInfo, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitions);
}
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
client.dropPartition(dbName, tableName, partitionValues, deleteData);
public void dropPartition(SimpleTableInfo tableInfo, List<String> partitionValues, boolean deleteData) {
client.dropPartition(tableInfo.getDbName(), tableInfo.getTbName(), partitionValues, deleteData);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.info.SimpleTableInfo;
import com.google.common.base.Preconditions;
import lombok.Data;
@ -31,8 +32,7 @@ public class HivePartition {
public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
public static final String FILE_NUM_KEY = "numFiles";
private String dbName;
private String tblName;
private SimpleTableInfo tableInfo;
private String inputFormat;
private String path;
private List<String> partitionValues;
@ -43,10 +43,9 @@ public class HivePartition {
private List<FieldSchema> columns;
// If you want to read the data under a partition, you can use this constructor
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this.dbName = dbName;
this.tblName = tblName;
this.tableInfo = tableInfo;
this.isDummyPartition = isDummyPartition;
// eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
this.inputFormat = inputFormat;
@ -57,17 +56,31 @@ public class HivePartition {
this.parameters = parameters;
}
public HivePartition(String database, String tableName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this(new SimpleTableInfo(database, tableName), isDummyPartition, inputFormat, path, partitionValues,
parameters);
}
// If you want to update hms with partition, then you can use this constructor,
// as updating hms requires some additional information, such as outputFormat and so on
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(dbName, tblName, isDummyPartition, inputFormat, path, partitionValues, parameters);
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(tableInfo, isDummyPartition, inputFormat, path, partitionValues, parameters);
this.outputFormat = outputFormat;
this.serde = serde;
this.columns = columns;
}
public String getDbName() {
return tableInfo.getDbName();
}
public String getTblName() {
return tableInfo.getTbName();
}
// return partition name like: nation=cn/city=beijing
public String getPartitionName(List<Column> partColumns) {
Preconditions.checkState(partColumns.size() == partitionValues.size());
@ -94,6 +107,7 @@ public class HivePartition {
/**
* If there are no files, it proves that there is no data under the partition, we return 0
*
* @return
*/
public long getLastModifiedTimeIgnoreInit() {
@ -112,12 +126,17 @@ public class HivePartition {
@Override
public String toString() {
return "HivePartition{"
+ "dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ ", isDummyPartition='" + isDummyPartition + '\''
+ ", inputFormat='" + inputFormat + '\''
+ ", path='" + path + '\''
+ ", partitionValues=" + partitionValues + '}';
final StringBuilder sb = new StringBuilder("HivePartition{");
sb.append("tableInfo=").append(tableInfo);
sb.append(", inputFormat='").append(inputFormat).append('\'');
sb.append(", path='").append(path).append('\'');
sb.append(", partitionValues=").append(partitionValues);
sb.append(", isDummyPartition=").append(isDummyPartition);
sb.append(", parameters=").append(parameters);
sb.append(", outputFormat='").append(outputFormat).append('\'');
sb.append(", serde='").append(serde).append('\'');
sb.append(", columns=").append(columns);
sb.append('}');
return sb.toString();
}
}

View File

@ -20,25 +20,28 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.datasource.statistics.CommonStatistics.ReduceOperator;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class HivePartitionStatistics {
public static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of());
new HivePartitionStatistics(CommonStatistics.EMPTY, ImmutableMap.of());
private final HiveCommonStatistics commonStatistics;
private final CommonStatistics commonStatistics;
private final Map<String, HiveColumnStatistics> columnStatisticsMap;
public HivePartitionStatistics(
HiveCommonStatistics commonStatistics,
CommonStatistics commonStatistics,
Map<String, HiveColumnStatistics> columnStatisticsMap) {
this.commonStatistics = commonStatistics;
this.columnStatisticsMap = columnStatisticsMap;
}
public HiveCommonStatistics getCommonStatistics() {
public CommonStatistics getCommonStatistics() {
return commonStatistics;
}
@ -48,7 +51,7 @@ public class HivePartitionStatistics {
public static HivePartitionStatistics fromCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
return new HivePartitionStatistics(
new HiveCommonStatistics(rowCount, fileCount, totalFileBytes),
new CommonStatistics(rowCount, fileCount, totalFileBytes),
ImmutableMap.of()
);
}
@ -62,56 +65,32 @@ public class HivePartitionStatistics {
}
return new HivePartitionStatistics(
reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
CommonStatistics
.reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
}
public static HivePartitionStatistics reduce(
HivePartitionStatistics first,
HivePartitionStatistics second,
ReduceOperator operator) {
HiveCommonStatistics left = first.getCommonStatistics();
HiveCommonStatistics right = second.getCommonStatistics();
CommonStatistics left = first.getCommonStatistics();
CommonStatistics right = second.getCommonStatistics();
return HivePartitionStatistics.fromCommonStatistics(
reduce(left.getRowCount(), right.getRowCount(), operator),
reduce(left.getFileCount(), right.getFileCount(), operator),
reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
CommonStatistics.reduce(left.getRowCount(), right.getRowCount(), operator),
CommonStatistics.reduce(left.getFileCount(), right.getFileCount(), operator),
CommonStatistics.reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
}
public static HiveCommonStatistics reduce(
HiveCommonStatistics current,
HiveCommonStatistics update,
public static CommonStatistics reduce(
CommonStatistics current,
CommonStatistics update,
ReduceOperator operator) {
return new HiveCommonStatistics(
reduce(current.getRowCount(), update.getRowCount(), operator),
reduce(current.getFileCount(), update.getFileCount(), operator),
reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
return new CommonStatistics(
CommonStatistics.reduce(current.getRowCount(), update.getRowCount(), operator),
CommonStatistics.reduce(current.getFileCount(), update.getFileCount(), operator),
CommonStatistics.reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}
public static long reduce(long current, long update, ReduceOperator operator) {
if (current >= 0 && update >= 0) {
switch (operator) {
case ADD:
return current + update;
case SUBTRACT:
return current - update;
case MAX:
return Math.max(current, update);
case MIN:
return Math.min(current, update);
default:
throw new IllegalArgumentException("Unexpected operator: " + operator);
}
}
return 0;
}
public enum ReduceOperator {
ADD,
SUBTRACT,
MIN,
MAX,
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -75,14 +76,14 @@ public final class HiveUtil {
/**
* get input format class from inputFormatName.
*
* @param jobConf jobConf used when getInputFormatClass
* @param jobConf jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
String inputFormatName, boolean symlinkTarget) throws UserException {
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
@ -167,12 +168,12 @@ public final class HiveUtil {
Map<String, List<String>> partitionNameToPartitionValues =
partitionNames
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
Map<List<String>, Partition> partitionValuesToPartition =
partitions.stream()
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
@ -312,7 +313,7 @@ public final class HiveUtil {
public static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
CommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);
result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
@ -345,8 +346,8 @@ public final class HiveUtil {
public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setDbName(hivePartition.getTableInfo().getDbName());
result.setTableName(hivePartition.getTableInfo().getTbName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
@ -355,7 +356,7 @@ public final class HiveUtil {
public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setName(partition.getTableInfo().getTbName());
serdeInfo.setSerializationLib(partition.getSerde());
StorageDescriptor sd = new StorageDescriptor();

View File

@ -96,18 +96,10 @@ public class IcebergTransaction implements Transaction {
.convertToWriterResult(fileFormat, spec, commitDataList);
List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
if (spec.isPartitioned()) {
partitionManifestUpdate(updateMode, table, pendingResults);
if (LOG.isDebugEnabled()) {
LOG.info("{} {} table partition manifest successful and writeResult : {}..", tableInfo, updateMode,
writeResult);
}
if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(table, pendingResults);
} else {
tableManifestUpdate(updateMode, table, pendingResults);
if (LOG.isDebugEnabled()) {
LOG.info("{} {} table manifest successful and writeResult : {}..", tableInfo, updateMode,
writeResult);
}
commitReplaceTxn(table, pendingResults);
}
}
@ -133,50 +125,8 @@ public class IcebergTransaction implements Transaction {
return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
}
private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) {
if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
LOG.warn("{} partitionManifestUp method call but pendingResults is null or empty!", table.name());
return;
}
// Commit the appendPartitionOperator transaction.
if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(table, pendingResults);
} else {
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files.");
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
}
appendPartitionOp.commit();
}
}
private void tableManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) {
if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
LOG.warn("{} tableManifestUp method call but pendingResults is null or empty!", table.name());
return;
}
// Commit the appendPartitionOperator transaction.
if (LOG.isDebugEnabled()) {
LOG.info("{} tableManifestUp method call ", table.name());
}
if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(table, pendingResults);
} else {
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files.");
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
}
appendPartitionOp.commit();
}
}
private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {
// To be compatible with iceberg format V1.
// commit append files.
AppendFiles appendFiles = table.newAppend();
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
@ -186,4 +136,16 @@ public class IcebergTransaction implements Transaction {
appendFiles.commit();
}
private void commitReplaceTxn(Table table, List<WriteResult> pendingResults) {
// commit replace partitions
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files.");
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
}
appendPartitionOp.commit();
}
}

View File

@ -17,6 +17,10 @@
package org.apache.doris.datasource.statistics;
/**
* This class provides operations related to file statistics, including object and field granularity add, min, max
* and other merge operations
*/
public class CommonStatistics {
public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 0L);

View File

@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSTransaction;
@ -42,8 +43,8 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
* constructor
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
@ -71,7 +72,7 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
loadedRows = transaction.getUpdateCnt();
String dbName = ((HMSExternalTable) table).getDbName();
String tbName = table.getName();
transaction.finishInsertTable(dbName, tbName);
transaction.finishInsertTable(new SimpleTableInfo(dbName, tbName));
}
@Override

View File

@ -15,30 +15,10 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
package org.apache.doris.nereids.trees.plans.commands.insert;
public class HiveCommonStatistics {
public static final HiveCommonStatistics EMPTY = new HiveCommonStatistics(0L, 0L, 0L);
private final long rowCount;
private final long fileCount;
private final long totalFileBytes;
public HiveCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
this.fileCount = fileCount;
this.rowCount = rowCount;
this.totalFileBytes = totalFileBytes;
}
public long getRowCount() {
return rowCount;
}
public long getFileCount() {
return fileCount;
}
public long getTotalFileBytes() {
return totalFileBytes;
}
/**
* For iceberg External Table
*/
public class IcebergInsertCommandContext extends BaseExternalTableInsertCommandContext {
}

View File

@ -26,11 +26,13 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -111,10 +113,14 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
}
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
if (!(targetTableIf instanceof OlapTable || targetTableIf instanceof HMSExternalTable)) {
throw new AnalysisException("insert into overwrite only support OLAP and HMS table."
+ " But current table type is " + targetTableIf.getType());
//check allow insert overwrite
if (!allowInsertOverwrite(targetTableIf)) {
String errMsg = "insert into overwrite only support OLAP and HMS/ICEBERG table."
+ " But current table type is " + targetTableIf.getType();
LOG.error(errMsg);
throw new AnalysisException(errMsg);
}
//check allow modify MTMVData
if (targetTableIf instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ctx)) {
throw new AnalysisException("Not allowed to perform current operation on async materialized view");
}
@ -190,8 +196,16 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
}
}
private boolean allowInsertOverwrite(TableIf targetTable) {
if (targetTable instanceof OlapTable) {
return true;
} else {
return targetTable instanceof HMSExternalTable || targetTable instanceof IcebergExternalTable;
}
}
private void runInsertCommand(LogicalPlan logicalQuery, InsertCommandContext insertCtx,
ConnectContext ctx, StmtExecutor executor) throws Exception {
ConnectContext ctx, StmtExecutor executor) throws Exception {
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(logicalQuery, labelName,
Optional.of(insertCtx), Optional.empty());
insertCommand.run(ctx, executor);
@ -205,8 +219,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
/**
* insert into select. for sepecified temp partitions
*
* @param ctx ctx
* @param executor executor
* @param ctx ctx
* @param executor executor
* @param tempPartitionNames tempPartitionNames
*/
private void insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames)
@ -241,6 +255,19 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
(LogicalPlan) (sink.child(0)));
insertCtx = new HiveInsertCommandContext();
((HiveInsertCommandContext) insertCtx).setOverwrite(true);
} else if (logicalQuery instanceof UnboundIcebergTableSink) {
UnboundIcebergTableSink<?> sink = (UnboundIcebergTableSink<?>) logicalQuery;
copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
false,
sink.getPartitions(),
false,
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
insertCtx = new IcebergInsertCommandContext();
((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
} else {
throw new UserException("Current catalog does not support insert overwrite yet.");
}
@ -250,7 +277,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
/**
* insert into auto detect partition.
*
* @param ctx ctx
* @param ctx ctx
* @param executor executor
*/
private void insertInto(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception {
@ -263,6 +290,9 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
} else if (logicalQuery instanceof UnboundHiveTableSink) {
insertCtx = new HiveInsertCommandContext();
((HiveInsertCommandContext) insertCtx).setOverwrite(true);
} else if (logicalQuery instanceof UnboundIcebergTableSink) {
insertCtx = new IcebergInsertCommandContext();
((IcebergInsertCommandContext) insertCtx).setOverwrite(true);
} else {
throw new UserException("Current catalog does not support insert overwrite yet.");
}

View File

@ -18,8 +18,8 @@
package org.apache.doris.datasource;
import org.apache.doris.analysis.TableName;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.hive.HiveDatabaseMetadata;
import org.apache.doris.datasource.hive.HivePartitionStatistics;
import org.apache.doris.datasource.hive.HivePartitionWithStatistics;
@ -48,7 +48,7 @@ import java.util.stream.Collectors;
public class TestHMSCachedClient implements HMSCachedClient {
public Map<HMSTransaction.DatabaseTableName, List<Partition>> partitions = new ConcurrentHashMap<>();
public Map<SimpleTableInfo, List<Partition>> partitions = new ConcurrentHashMap<>();
public Map<String, List<Table>> tables = new HashMap<>();
public List<Database> dbs = new ArrayList<>();
@ -232,7 +232,7 @@ public class TestHMSCachedClient implements HMSCachedClient {
public void dropTable(String dbName, String tableName) {
Table table = getTable(dbName, tableName);
this.tables.get(dbName).remove(table);
this.partitions.remove(new HMSTransaction.DatabaseTableName(dbName, tableName));
this.partitions.remove(new SimpleTableInfo(dbName, tableName));
}
@Override
@ -248,7 +248,7 @@ public class TestHMSCachedClient implements HMSCachedClient {
List<Table> tableList = getTableList(tbl.getDbName());
tableList.add(HiveUtil.toHiveTable((HiveTableMetadata) tbl));
HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tbName);
SimpleTableInfo key = new SimpleTableInfo(dbName, tbName);
partitions.put(key, new ArrayList<>());
}
@ -322,7 +322,7 @@ public class TestHMSCachedClient implements HMSCachedClient {
}
public List<Partition> getPartitionList(String dbName, String tableName) {
HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tableName);
SimpleTableInfo key = new SimpleTableInfo(dbName, tableName);
List<Partition> partitionList = this.partitions.get(key);
if (partitionList == null) {
throw new RuntimeException("can't found table: " + key);

View File

@ -19,6 +19,7 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.TestHMSCachedClient;
import org.apache.doris.fs.FileSystem;
@ -142,9 +143,9 @@ public class HmsCommitTest {
// create table for tbWithoutPartition
HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
dbName, tbWithoutPartition, Optional.of(dbLocation + tbWithPartition + UUID.randomUUID()),
columns, new ArrayList<>(),
new HashMap<>(), fileFormat, "");
dbName, tbWithoutPartition, Optional.of(dbLocation + tbWithPartition + UUID.randomUUID()),
columns, new ArrayList<>(),
new HashMap<>(), fileFormat, "");
hmsClient.createTable(tableMetadata2, true);
// context
@ -363,22 +364,22 @@ public class HmsCommitTest {
public THivePartitionUpdate createRandomNew(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.NEW) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
}
public THivePartitionUpdate createRandomAppend(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.APPEND) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
}
public THivePartitionUpdate createRandomOverwrite(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.OVERWRITE) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
String tableName,
List<THivePartitionUpdate> hivePUs) {
HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, fileSystemProvider, fileSystemExecutor);
hmsTransaction.setHivePartitionUpdates(hivePUs);
HiveInsertCommandContext ctx = new HiveInsertCommandContext();
@ -386,7 +387,7 @@ public class HmsCommitTest {
ctx.setQueryId(queryId);
ctx.setWritePath(writeLocation + queryId + "/");
hmsTransaction.beginInsertTable(ctx);
hmsTransaction.finishInsertTable(dbName, tableName);
hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, tableName));
hmsTransaction.commit();
}
@ -404,11 +405,11 @@ public class HmsCommitTest {
new MockUp<HMSTransaction>(HMSTransaction.class) {
@Mock
private void wrapperAsyncRenameDirWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
runnable.run();
throw new RuntimeException("failed to rename dir");
}