[opt](hive) save hive table schema in transaction for 2.1 (#37127)
## Proposed changes pick #37008
This commit is contained in:
@ -48,6 +48,7 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.airlift.concurrent.MoreFutures;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
@ -88,6 +89,7 @@ public class HMSTransaction implements Transaction {
|
||||
private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = new HashMap<>();
|
||||
private final Map<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>>
|
||||
partitionActions = new HashMap<>();
|
||||
private final Map<DatabaseTableName, List<FieldSchema>> tableColumns = new HashMap<>();
|
||||
|
||||
private final Executor fileSystemExecutor;
|
||||
private HmsCommitter hmsCommitter;
|
||||
@ -123,7 +125,7 @@ public class HMSTransaction implements Transaction {
|
||||
}
|
||||
}
|
||||
|
||||
private Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = new HashSet<>();
|
||||
private final Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = new HashSet<>();
|
||||
|
||||
public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) {
|
||||
this.hiveOps = hiveOps;
|
||||
@ -241,7 +243,7 @@ public class HMSTransaction implements Transaction {
|
||||
Maps.newHashMap(),
|
||||
sd.getOutputFormat(),
|
||||
sd.getSerdeInfo().getSerializationLib(),
|
||||
hiveOps.getClient().getSchema(dbName, tbName)
|
||||
getTableColumns(dbName, tbName)
|
||||
);
|
||||
if (updateMode == TUpdateMode.OVERWRITE) {
|
||||
dropPartition(dbName, tbName, hivePartition.getPartitionValues(), true);
|
||||
@ -396,7 +398,7 @@ public class HMSTransaction implements Transaction {
|
||||
partition.getParameters(),
|
||||
sd.getOutputFormat(),
|
||||
sd.getSerdeInfo().getSerializationLib(),
|
||||
hiveOps.getClient().getSchema(dbName, tbName)
|
||||
getTableColumns(dbName, tbName)
|
||||
);
|
||||
|
||||
partitionActionsForTable.put(
|
||||
@ -913,6 +915,11 @@ public class HMSTransaction implements Transaction {
|
||||
throw new RuntimeException("Not Found table: " + databaseName + "." + tableName);
|
||||
}
|
||||
|
||||
public synchronized List<FieldSchema> getTableColumns(String databaseName, String tableName) {
|
||||
return tableColumns.computeIfAbsent(new DatabaseTableName(databaseName, tableName),
|
||||
key -> hiveOps.getClient().getSchema(dbName, tbName));
|
||||
}
|
||||
|
||||
public synchronized void finishChangingExistingTable(
|
||||
ActionType actionType,
|
||||
String databaseName,
|
||||
@ -1276,7 +1283,7 @@ public class HMSTransaction implements Transaction {
|
||||
Maps.newHashMap(),
|
||||
sd.getOutputFormat(),
|
||||
sd.getSerdeInfo().getSerializationLib(),
|
||||
hiveOps.getClient().getSchema(dbName, tbName)
|
||||
getTableColumns(dbName, tbName)
|
||||
);
|
||||
|
||||
HivePartitionWithStatistics partitionWithStats =
|
||||
|
||||
Reference in New Issue
Block a user