From 860ce976229785693366284716f8d20aa4cad59b Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 12 May 2023 15:32:26 +0800 Subject: [PATCH] [feature](torc) support insert only transactional hive table on FE side (#19419) * [feature](torc) support insert only transactional hive table on FE side * 3 * commit * 1 --- .../java/org/apache/doris/catalog/Env.java | 12 ++ .../catalog/HiveMetaStoreClientHelper.java | 2 +- .../catalog/external/HMSExternalTable.java | 29 +++++ .../datasource/hive/HiveMetaStoreCache.java | 52 +++++++- .../doris/datasource/hive/HivePartition.java | 16 +++ .../datasource/hive/HiveTransaction.java | 85 +++++++++++++ .../datasource/hive/HiveTransactionMgr.java | 55 ++++++++ .../hive/PooledHiveMetaStoreClient.java | 120 ++++++++++++++++++ .../java/org/apache/doris/fs/RemoteFiles.java | 2 +- .../doris/planner/external/HiveScanNode.java | 41 +++++- .../org/apache/doris/qe/QeProcessorImpl.java | 4 + 11 files changed, 409 insertions(+), 9 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransactionMgr.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index e1a42c4c33..ee4a092ae8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HiveTransactionMgr; import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; import org.apache.doris.deploy.DeployManager; import org.apache.doris.deploy.impl.AmbariDeployManager; @@ -445,6 +446,8 @@ public class Env { private StatisticsAutoAnalyzer statisticsAutoAnalyzer; + private HiveTransactionMgr hiveTransactionMgr; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -649,6 +652,7 @@ public class Env { this.globalFunctionMgr = new GlobalFunctionMgr(); this.resourceGroupMgr = new ResourceGroupMgr(); this.loadManagerAdapter = new LoadManagerAdapter(); + this.hiveTransactionMgr = new HiveTransactionMgr(); } public static void destroyCheckpoint() { @@ -768,6 +772,14 @@ public class Env { return checkpointer; } + public HiveTransactionMgr getHiveTransactionMgr() { + return hiveTransactionMgr; + } + + public static HiveTransactionMgr getCurrentHiveTransactionMgr() { + return getCurrentEnv().getHiveTransactionMgr(); + } + // Use tryLock to avoid potential dead lock private boolean tryLock(boolean mustLock) { while (true) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 116a9af107..9ad6207525 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -222,7 +222,7 @@ public class HiveMetaStoreClientHelper { while (queue.peek() != null) { RemoteFiles locs = queue.poll(); try { - for (RemoteFile fileLocation : locs.locations()) { + for (RemoteFile fileLocation : locs.files()) { Path filePath = fileLocation.getPath(); // hdfs://host:port/path/to/partition/file_name String fullUri = filePath.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index b94804ac61..169ae75610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -39,6 +39,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.logging.log4j.LogManager; @@ -58,6 +59,9 @@ public class HMSExternalTable extends ExternalTable { private static final Set SUPPORTED_HIVE_FILE_FORMATS; + private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; + private static final String TBL_PROP_INSERT_ONLY = "insert_only"; + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -141,6 +145,27 @@ public class HMSExternalTable extends ExternalTable { * Support managed_table and external_table. */ private boolean supportedHiveTable() { + boolean isTxnTbl = AcidUtils.isTransactionalTable(remoteTable); + if (isTxnTbl) { + // Only support "insert_only" transactional table + // There are 2 types of parameter: + // "transactional_properties" = "insert_only", + // or, + // "insert_only" = "true" + // And must check "insert_only" first, because "transactional_properties" may be "default" + Map parameters = remoteTable.getParameters(); + if (parameters.containsKey(TBL_PROP_INSERT_ONLY)) { + if (!parameters.get(TBL_PROP_INSERT_ONLY).equalsIgnoreCase("true")) { + return false; + } + } else if (parameters.containsKey(TBL_PROP_TXN_PROPERTIES)) { + if (!parameters.get(TBL_PROP_TXN_PROPERTIES).equalsIgnoreCase(TBL_PROP_INSERT_ONLY)) { + return false; + } + } else { + return false; + } + } String inputFileFormat = remoteTable.getSd().getInputFormat(); boolean supportedFileFormat = inputFileFormat != null && SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat); LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat); @@ -167,6 +192,10 @@ public class HMSExternalTable extends ExternalTable { return partitionColumns; } + public boolean isHiveTransactionalTable() { + return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable); + } + @Override public boolean isView() { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index f6382b80da..a14ce3086c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -60,8 +60,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -94,6 +96,8 @@ public class HiveMetaStoreCache { private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class); private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50; public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; + // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'. + public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version"; private HMSExternalCatalog catalog; @@ -283,7 +287,7 @@ public class HiveMetaStoreCache { result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); - locatedFiles.locations().forEach(result::addFile); + locatedFiles.files().forEach(result::addFile); result.setPartitionValues(partitionValues); return result; } @@ -648,6 +652,52 @@ public class HiveMetaStoreCache { return fileCacheRef; } + public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds) { + List fileCacheValues = Lists.newArrayList(); + JobConf jobConf = getJobConf(); + String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); + try { + for (HivePartition partition : partitions) { + FileCacheValue fileCacheValue = new FileCacheValue(); + AcidUtils.Directory directory; + if (!Strings.isNullOrEmpty(remoteUser)) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); + directory = ugi.doAs((PrivilegedExceptionAction) () -> AcidUtils.getAcidState( + new Path(partition.getPath()), jobConf, validWriteIds, false, true)); + } else { + directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false, + true); + } + if (!directory.getOriginalFiles().isEmpty()) { + throw new Exception("Original non-ACID files in transactional tables are not supported"); + } + + // delta directories + for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { + String location = delta.getPath().toString(); + RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf); + RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); + locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE)) + .forEach(fileCacheValue::addFile); + } + + // base + if (directory.getBaseDirectory() != null) { + String location = directory.getBaseDirectory().toString(); + RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf); + RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); + locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE)) + .forEach(fileCacheValue::addFile); + } + fileCacheValues.add(fileCacheValue); + } + } catch (Exception e) { + throw new CacheException("failed to get input splits for write ids %s in catalog %s", e, + validWriteIds.toString(), catalog.getName()); + } + return fileCacheValues; + } + /** * The Key of hive partition value cache */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java index 7d805f4f67..1c2e4341ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java @@ -17,6 +17,9 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.catalog.Column; + +import com.google.common.base.Preconditions; import lombok.Data; import java.util.List; @@ -43,6 +46,19 @@ public class HivePartition { this.partitionValues = partitionValues; } + // return partition name like: nation=cn/city=beijing + public String getPartitionName(List partColumns) { + Preconditions.checkState(partColumns.size() == partitionValues.size()); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partColumns.get(i).getName()).append("=").append(partitionValues.get(i)); + } + return sb.toString(); + } + public boolean isDummyPartition() { return this.isDummyPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java new file mode 100644 index 0000000000..7919d451a2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.HMSExternalCatalog; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidWriteIdList; + +import java.util.List; + +/** + * HiveTransaction is used to save info of a hive transaction. + * Used when reading hive transactional table. + * Each HiveTransaction is bound to a query. + */ +public class HiveTransaction { + private final String queryId; + private final String user; + private final HMSExternalTable hiveTable; + + private long txnId; + private List partitionNames = Lists.newArrayList(); + + ValidWriteIdList validWriteIdList = null; + + public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable) { + this.queryId = queryId; + this.user = user; + this.hiveTable = hiveTable; + } + + public String getQueryId() { + return queryId; + } + + public void addPartition(String partitionName) { + this.partitionNames.add(partitionName); + } + + public ValidWriteIdList getValidWriteIds(PooledHiveMetaStoreClient client) { + if (validWriteIdList == null) { + TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(), + hiveTable.getName()); + client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000); + validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId); + } + return validWriteIdList; + } + + public void begin() throws UserException { + try { + this.txnId = ((HMSExternalCatalog) hiveTable.getCatalog()).getClient().openTxn(user); + } catch (RuntimeException e) { + throw new UserException(e.getMessage(), e); + } + } + + public void commit() throws UserException { + try { + ((HMSExternalCatalog) hiveTable.getCatalog()).getClient().commitTxn(txnId); + } catch (RuntimeException e) { + throw new UserException(e.getMessage(), e); + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransactionMgr.java new file mode 100644 index 0000000000..e70452b859 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransactionMgr.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.common.UserException; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * HiveTransactionMgr is used to manage hive transaction. + * For each query, it will register a HiveTransaction. + * When query is finished, it will deregister the HiveTransaction. + */ +public class HiveTransactionMgr { + private static final Logger LOG = LogManager.getLogger(HiveTransactionMgr.class); + private Map txnMap = Maps.newConcurrentMap(); + + public HiveTransactionMgr() { + } + + public void register(HiveTransaction txn) throws UserException { + txn.begin(); + txnMap.put(txn.getQueryId(), txn); + } + + public void deregister(String queryId) { + HiveTransaction txn = txnMap.remove(queryId); + if (txn != null) { + try { + txn.commit(); + } catch (UserException e) { + LOG.warn("failed to commit hive txn: " + queryId, e); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java index e244050198..e26a4c9284 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.analysis.TableName; import org.apache.doris.common.Config; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -25,25 +26,39 @@ import org.apache.doris.datasource.property.constants.HMSProperties; import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; /** @@ -175,6 +190,111 @@ public class PooledHiveMetaStoreClient { } } + public long openTxn(String user) { + try (CachedClient client = getClient()) { + return client.client.openTxn(user); + } catch (Exception e) { + throw new RuntimeException("failed to open transaction", e); + } + } + + public void commitTxn(long txnId) { + try (CachedClient client = getClient()) { + client.client.commitTxn(txnId); + } catch (Exception e) { + throw new RuntimeException("failed to commit transaction " + txnId, e); + } + } + + public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, + List partitionNames, long timeoutMs) { + LockRequestBuilder request = new LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user); + List lockComponents = createLockComponentsForRead(tblName, partitionNames); + for (LockComponent component : lockComponents) { + request.addLockComponent(component); + } + try (CachedClient client = getClient()) { + LockResponse response = client.client.lock(request.build()); + long start = System.currentTimeMillis(); + while (response.getState() == LockState.WAITING) { + long lockId = response.getLockid(); + if (System.currentTimeMillis() - start > timeoutMs) { + throw new RuntimeException( + "acquire lock timeout for txn " + txnId + " of query " + queryId + ", timeout(ms): " + + timeoutMs); + } + response = checkLock(lockId); + } + + if (response.getState() != LockState.ACQUIRED) { + throw new RuntimeException("failed to acquire lock, lock in state " + response.getState()); + } + } catch (Exception e) { + throw new RuntimeException("failed to commit transaction " + txnId, e); + } + } + + public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) { + try (CachedClient client = getClient()) { + // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive + // Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction + // deletes delta directories for valid transactions that existed at the time transaction is opened + ValidTxnList validTransactions = client.client.getValidTxns(); + List tableValidWriteIdsList = client.client.getValidWriteIds( + Collections.singletonList(fullTableName), validTransactions.toString()); + if (tableValidWriteIdsList.size() != 1) { + throw new Exception("tableValidWriteIdsList's size should be 1"); + } + ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId, + tableValidWriteIdsList); + ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName); + return writeIdList; + } catch (Exception e) { + throw new RuntimeException( + "failed to get valid write ids for " + fullTableName + ", transaction " + currentTransactionId, e); + } + } + + private LockResponse checkLock(long lockId) { + try (CachedClient client = getClient()) { + return client.client.checkLock(lockId); + } catch (Exception e) { + throw new RuntimeException("failed to check lock " + lockId, e); + } + } + + private static List createLockComponentsForRead(TableName tblName, List partitionNames) { + List components = Lists.newArrayListWithCapacity( + partitionNames.isEmpty() ? 1 : partitionNames.size()); + if (partitionNames.isEmpty()) { + components.add(createLockComponentForRead(tblName, Optional.empty())); + } else { + for (String partitionName : partitionNames) { + components.add(createLockComponentForRead(tblName, Optional.of(partitionName))); + } + } + return components; + } + + private static LockComponent createLockComponentForRead(TableName tblName, Optional partitionName) { + LockComponentBuilder builder = new LockComponentBuilder(); + builder.setShared(); + builder.setOperationType(DataOperationType.SELECT); + builder.setDbName(tblName.getDb()); + builder.setTableName(tblName.getTbl()); + partitionName.ifPresent(builder::setPartitionName); + builder.setIsTransactional(true); + return builder.build(); + } + + public void heartbeatForTxn(long txnId) { + try (CachedClient client = getClient()) { + client.client.heartbeat(txnId, txnId); + } catch (Exception e) { + throw new RuntimeException("failed to do heartbeat for transaction " + txnId, e); + } + } + private class CachedClient implements AutoCloseable { private final IMetaStoreClient client; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFiles.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFiles.java index 616e27a7eb..761d988652 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFiles.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFiles.java @@ -29,7 +29,7 @@ public class RemoteFiles { this.files = files; } - public List locations() { + public List files() { return files; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 87d0e55796..560278cb48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -29,12 +29,16 @@ import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileAttributes; @@ -44,6 +48,7 @@ import org.apache.doris.thrift.TFileType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -62,6 +67,7 @@ public class HiveScanNode extends FileQueryScanNode { public static final String DEFAULT_LINE_DELIMITER = "\n"; private final HMSExternalTable hmsTable; + private HiveTransaction hiveTransaction = null; /** * * External file scan node for Query Hive table @@ -89,11 +95,17 @@ public class HiveScanNode extends FileQueryScanNode { for (SlotDescriptor slot : desc.getSlots()) { if (!slot.getType().isScalarType()) { throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types ARRAY/MAP/STRUCT are not supported yet" - + " for text input format of Hive. "); + + "`, The column types ARRAY/MAP/STRUCT are not supported yet" + + " for text input format of Hive. "); } } } + + if (hmsTable.isHiveTransactionalTable()) { + this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()), + ConnectContext.get().getQualifiedUser(), hmsTable); + Env.getCurrentHiveTransactionMgr().register(hiveTransaction); + } } @Override @@ -159,9 +171,13 @@ public class HiveScanNode extends FileQueryScanNode { private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, List allFiles, boolean useSelfSplitter) throws IOException { - - for (HiveMetaStoreCache.FileCacheValue fileCacheValue : - cache.getFilesByPartitions(partitions, useSelfSplitter)) { + List fileCaches; + if (hiveTransaction != null) { + fileCaches = getFileSplitByTransaction(cache, partitions); + } else { + fileCaches = cache.getFilesByPartitions(partitions, useSelfSplitter); + } + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { // This if branch is to support old splitter, will remove later. if (fileCacheValue.getSplits() != null) { allFiles.addAll(fileCacheValue.getSplits()); @@ -177,10 +193,23 @@ public class HiveScanNode extends FileQueryScanNode { } } + private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions) { + for (HivePartition partition : partitions) { + if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) { + // this is unpartitioned table. + continue; + } + hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns())); + } + ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( + ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); + return cache.getFilesByTransaction(partitions, validWriteIds); + } + @Override public List getPathPartitionKeys() { return hmsTable.getRemoteTable().getPartitionKeys() - .stream().map(FieldSchema::getName).collect(Collectors.toList()); + .stream().map(FieldSchema::getName).collect(Collectors.toList()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index eea008f143..86f0a11134 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; @@ -151,6 +152,9 @@ public final class QeProcessorImpl implements QeProcessor { LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); } } + + // commit hive tranaction if needed + Env.getCurrentHiveTransactionMgr().deregister(DebugUtil.printId(queryId)); } @Override