[feature](torc) support insert only transactional hive table on FE side (#19419)
* [feature](torc) support insert only transactional hive table on FE side * 3 * commit * 1
This commit is contained in:
@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.EsExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalMetaCacheMgr;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveTransactionMgr;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
|
||||
import org.apache.doris.deploy.DeployManager;
|
||||
import org.apache.doris.deploy.impl.AmbariDeployManager;
|
||||
@ -445,6 +446,8 @@ public class Env {
|
||||
|
||||
private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
|
||||
|
||||
private HiveTransactionMgr hiveTransactionMgr;
|
||||
|
||||
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
|
||||
if (nodeType == null) {
|
||||
// get all
|
||||
@ -649,6 +652,7 @@ public class Env {
|
||||
this.globalFunctionMgr = new GlobalFunctionMgr();
|
||||
this.resourceGroupMgr = new ResourceGroupMgr();
|
||||
this.loadManagerAdapter = new LoadManagerAdapter();
|
||||
this.hiveTransactionMgr = new HiveTransactionMgr();
|
||||
}
|
||||
|
||||
public static void destroyCheckpoint() {
|
||||
@ -768,6 +772,14 @@ public class Env {
|
||||
return checkpointer;
|
||||
}
|
||||
|
||||
public HiveTransactionMgr getHiveTransactionMgr() {
|
||||
return hiveTransactionMgr;
|
||||
}
|
||||
|
||||
public static HiveTransactionMgr getCurrentHiveTransactionMgr() {
|
||||
return getCurrentEnv().getHiveTransactionMgr();
|
||||
}
|
||||
|
||||
// Use tryLock to avoid potential dead lock
|
||||
private boolean tryLock(boolean mustLock) {
|
||||
while (true) {
|
||||
|
||||
@ -222,7 +222,7 @@ public class HiveMetaStoreClientHelper {
|
||||
while (queue.peek() != null) {
|
||||
RemoteFiles locs = queue.poll();
|
||||
try {
|
||||
for (RemoteFile fileLocation : locs.locations()) {
|
||||
for (RemoteFile fileLocation : locs.files()) {
|
||||
Path filePath = fileLocation.getPath();
|
||||
// hdfs://host:port/path/to/partition/file_name
|
||||
String fullUri = filePath.toString();
|
||||
|
||||
@ -39,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.ql.io.AcidUtils;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -58,6 +59,9 @@ public class HMSExternalTable extends ExternalTable {
|
||||
|
||||
private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
|
||||
|
||||
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
|
||||
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
|
||||
|
||||
static {
|
||||
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
|
||||
@ -141,6 +145,27 @@ public class HMSExternalTable extends ExternalTable {
|
||||
* Support managed_table and external_table.
|
||||
*/
|
||||
private boolean supportedHiveTable() {
|
||||
boolean isTxnTbl = AcidUtils.isTransactionalTable(remoteTable);
|
||||
if (isTxnTbl) {
|
||||
// Only support "insert_only" transactional table
|
||||
// There are 2 types of parameter:
|
||||
// "transactional_properties" = "insert_only",
|
||||
// or,
|
||||
// "insert_only" = "true"
|
||||
// And must check "insert_only" first, because "transactional_properties" may be "default"
|
||||
Map<String, String> parameters = remoteTable.getParameters();
|
||||
if (parameters.containsKey(TBL_PROP_INSERT_ONLY)) {
|
||||
if (!parameters.get(TBL_PROP_INSERT_ONLY).equalsIgnoreCase("true")) {
|
||||
return false;
|
||||
}
|
||||
} else if (parameters.containsKey(TBL_PROP_TXN_PROPERTIES)) {
|
||||
if (!parameters.get(TBL_PROP_TXN_PROPERTIES).equalsIgnoreCase(TBL_PROP_INSERT_ONLY)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
String inputFileFormat = remoteTable.getSd().getInputFormat();
|
||||
boolean supportedFileFormat = inputFileFormat != null && SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
|
||||
LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat);
|
||||
@ -167,6 +192,10 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return partitionColumns;
|
||||
}
|
||||
|
||||
public boolean isHiveTransactionalTable() {
|
||||
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isView() {
|
||||
makeSureInitialized();
|
||||
|
||||
@ -60,8 +60,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.ql.io.AcidUtils;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
@ -94,6 +96,8 @@ public class HiveMetaStoreCache {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class);
|
||||
private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
|
||||
public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
|
||||
// After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
|
||||
public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";
|
||||
|
||||
private HMSExternalCatalog catalog;
|
||||
|
||||
@ -283,7 +287,7 @@ public class HiveMetaStoreCache {
|
||||
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
|
||||
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.locations().forEach(result::addFile);
|
||||
locatedFiles.files().forEach(result::addFile);
|
||||
result.setPartitionValues(partitionValues);
|
||||
return result;
|
||||
}
|
||||
@ -648,6 +652,52 @@ public class HiveMetaStoreCache {
|
||||
return fileCacheRef;
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds) {
|
||||
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
|
||||
JobConf jobConf = getJobConf();
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
try {
|
||||
for (HivePartition partition : partitions) {
|
||||
FileCacheValue fileCacheValue = new FileCacheValue();
|
||||
AcidUtils.Directory directory;
|
||||
if (!Strings.isNullOrEmpty(remoteUser)) {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
directory = ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> AcidUtils.getAcidState(
|
||||
new Path(partition.getPath()), jobConf, validWriteIds, false, true));
|
||||
} else {
|
||||
directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false,
|
||||
true);
|
||||
}
|
||||
if (!directory.getOriginalFiles().isEmpty()) {
|
||||
throw new Exception("Original non-ACID files in transactional tables are not supported");
|
||||
}
|
||||
|
||||
// delta directories
|
||||
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
|
||||
String location = delta.getPath().toString();
|
||||
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
|
||||
.forEach(fileCacheValue::addFile);
|
||||
}
|
||||
|
||||
// base
|
||||
if (directory.getBaseDirectory() != null) {
|
||||
String location = directory.getBaseDirectory().toString();
|
||||
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
|
||||
.forEach(fileCacheValue::addFile);
|
||||
}
|
||||
fileCacheValues.add(fileCacheValue);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new CacheException("failed to get input splits for write ids %s in catalog %s", e,
|
||||
validWriteIds.toString(), catalog.getName());
|
||||
}
|
||||
return fileCacheValues;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Key of hive partition value cache
|
||||
*/
|
||||
|
||||
@ -17,6 +17,9 @@
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
@ -43,6 +46,19 @@ public class HivePartition {
|
||||
this.partitionValues = partitionValues;
|
||||
}
|
||||
|
||||
// return partition name like: nation=cn/city=beijing
|
||||
public String getPartitionName(List<Column> partColumns) {
|
||||
Preconditions.checkState(partColumns.size() == partitionValues.size());
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < partColumns.size(); ++i) {
|
||||
if (i != 0) {
|
||||
sb.append("/");
|
||||
}
|
||||
sb.append(partColumns.get(i).getName()).append("=").append(partitionValues.get(i));
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public boolean isDummyPartition() {
|
||||
return this.isDummyPartition;
|
||||
}
|
||||
|
||||
@ -0,0 +1,85 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* HiveTransaction is used to save info of a hive transaction.
|
||||
* Used when reading hive transactional table.
|
||||
* Each HiveTransaction is bound to a query.
|
||||
*/
|
||||
public class HiveTransaction {
|
||||
private final String queryId;
|
||||
private final String user;
|
||||
private final HMSExternalTable hiveTable;
|
||||
|
||||
private long txnId;
|
||||
private List<String> partitionNames = Lists.newArrayList();
|
||||
|
||||
ValidWriteIdList validWriteIdList = null;
|
||||
|
||||
public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable) {
|
||||
this.queryId = queryId;
|
||||
this.user = user;
|
||||
this.hiveTable = hiveTable;
|
||||
}
|
||||
|
||||
public String getQueryId() {
|
||||
return queryId;
|
||||
}
|
||||
|
||||
public void addPartition(String partitionName) {
|
||||
this.partitionNames.add(partitionName);
|
||||
}
|
||||
|
||||
public ValidWriteIdList getValidWriteIds(PooledHiveMetaStoreClient client) {
|
||||
if (validWriteIdList == null) {
|
||||
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
|
||||
hiveTable.getName());
|
||||
client.acquireSharedLock(queryId, txnId, user, tableName, partitionNames, 5000);
|
||||
validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." + tableName.getTbl(), txnId);
|
||||
}
|
||||
return validWriteIdList;
|
||||
}
|
||||
|
||||
public void begin() throws UserException {
|
||||
try {
|
||||
this.txnId = ((HMSExternalCatalog) hiveTable.getCatalog()).getClient().openTxn(user);
|
||||
} catch (RuntimeException e) {
|
||||
throw new UserException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public void commit() throws UserException {
|
||||
try {
|
||||
((HMSExternalCatalog) hiveTable.getCatalog()).getClient().commitTxn(txnId);
|
||||
} catch (RuntimeException e) {
|
||||
throw new UserException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* HiveTransactionMgr is used to manage hive transaction.
|
||||
* For each query, it will register a HiveTransaction.
|
||||
* When query is finished, it will deregister the HiveTransaction.
|
||||
*/
|
||||
public class HiveTransactionMgr {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveTransactionMgr.class);
|
||||
private Map<String, HiveTransaction> txnMap = Maps.newConcurrentMap();
|
||||
|
||||
public HiveTransactionMgr() {
|
||||
}
|
||||
|
||||
public void register(HiveTransaction txn) throws UserException {
|
||||
txn.begin();
|
||||
txnMap.put(txn.getQueryId(), txn);
|
||||
}
|
||||
|
||||
public void deregister(String queryId) {
|
||||
HiveTransaction txn = txnMap.remove(queryId);
|
||||
if (txn != null) {
|
||||
try {
|
||||
txn.commit();
|
||||
} catch (UserException e) {
|
||||
LOG.warn("failed to commit hive txn: " + queryId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.HMSClientException;
|
||||
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
|
||||
@ -25,25 +26,39 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
|
||||
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hive.common.ValidTxnList;
|
||||
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
|
||||
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
|
||||
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
|
||||
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
|
||||
import org.apache.hadoop.hive.metastore.api.DataOperationType;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.LockComponent;
|
||||
import org.apache.hadoop.hive.metastore.api.LockResponse;
|
||||
import org.apache.hadoop.hive.metastore.api.LockState;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
|
||||
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
|
||||
/**
|
||||
@ -175,6 +190,111 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
public long openTxn(String user) {
|
||||
try (CachedClient client = getClient()) {
|
||||
return client.client.openTxn(user);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to open transaction", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void commitTxn(long txnId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
client.client.commitTxn(txnId);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to commit transaction " + txnId, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName,
|
||||
List<String> partitionNames, long timeoutMs) {
|
||||
LockRequestBuilder request = new LockRequestBuilder(queryId).setTransactionId(txnId).setUser(user);
|
||||
List<LockComponent> lockComponents = createLockComponentsForRead(tblName, partitionNames);
|
||||
for (LockComponent component : lockComponents) {
|
||||
request.addLockComponent(component);
|
||||
}
|
||||
try (CachedClient client = getClient()) {
|
||||
LockResponse response = client.client.lock(request.build());
|
||||
long start = System.currentTimeMillis();
|
||||
while (response.getState() == LockState.WAITING) {
|
||||
long lockId = response.getLockid();
|
||||
if (System.currentTimeMillis() - start > timeoutMs) {
|
||||
throw new RuntimeException(
|
||||
"acquire lock timeout for txn " + txnId + " of query " + queryId + ", timeout(ms): "
|
||||
+ timeoutMs);
|
||||
}
|
||||
response = checkLock(lockId);
|
||||
}
|
||||
|
||||
if (response.getState() != LockState.ACQUIRED) {
|
||||
throw new RuntimeException("failed to acquire lock, lock in state " + response.getState());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to commit transaction " + txnId, e);
|
||||
}
|
||||
}
|
||||
|
||||
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
// Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
|
||||
// Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
|
||||
// deletes delta directories for valid transactions that existed at the time transaction is opened
|
||||
ValidTxnList validTransactions = client.client.getValidTxns();
|
||||
List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
|
||||
Collections.singletonList(fullTableName), validTransactions.toString());
|
||||
if (tableValidWriteIdsList.size() != 1) {
|
||||
throw new Exception("tableValidWriteIdsList's size should be 1");
|
||||
}
|
||||
ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
|
||||
tableValidWriteIdsList);
|
||||
ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
|
||||
return writeIdList;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"failed to get valid write ids for " + fullTableName + ", transaction " + currentTransactionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private LockResponse checkLock(long lockId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
return client.client.checkLock(lockId);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to check lock " + lockId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<LockComponent> createLockComponentsForRead(TableName tblName, List<String> partitionNames) {
|
||||
List<LockComponent> components = Lists.newArrayListWithCapacity(
|
||||
partitionNames.isEmpty() ? 1 : partitionNames.size());
|
||||
if (partitionNames.isEmpty()) {
|
||||
components.add(createLockComponentForRead(tblName, Optional.empty()));
|
||||
} else {
|
||||
for (String partitionName : partitionNames) {
|
||||
components.add(createLockComponentForRead(tblName, Optional.of(partitionName)));
|
||||
}
|
||||
}
|
||||
return components;
|
||||
}
|
||||
|
||||
private static LockComponent createLockComponentForRead(TableName tblName, Optional<String> partitionName) {
|
||||
LockComponentBuilder builder = new LockComponentBuilder();
|
||||
builder.setShared();
|
||||
builder.setOperationType(DataOperationType.SELECT);
|
||||
builder.setDbName(tblName.getDb());
|
||||
builder.setTableName(tblName.getTbl());
|
||||
partitionName.ifPresent(builder::setPartitionName);
|
||||
builder.setIsTransactional(true);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public void heartbeatForTxn(long txnId) {
|
||||
try (CachedClient client = getClient()) {
|
||||
client.client.heartbeat(txnId, txnId);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to do heartbeat for transaction " + txnId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private class CachedClient implements AutoCloseable {
|
||||
private final IMetaStoreClient client;
|
||||
|
||||
|
||||
@ -29,7 +29,7 @@ public class RemoteFiles {
|
||||
this.files = files;
|
||||
}
|
||||
|
||||
public List<RemoteFile> locations() {
|
||||
public List<RemoteFile> files() {
|
||||
return files;
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,12 +29,16 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.datasource.hive.HiveTransaction;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
@ -44,6 +48,7 @@ import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -62,6 +67,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
|
||||
private final HMSExternalTable hmsTable;
|
||||
private HiveTransaction hiveTransaction = null;
|
||||
|
||||
/**
|
||||
* * External file scan node for Query Hive table
|
||||
@ -89,11 +95,17 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
for (SlotDescriptor slot : desc.getSlots()) {
|
||||
if (!slot.getType().isScalarType()) {
|
||||
throw new UserException("For column `" + slot.getColumn().getName()
|
||||
+ "`, The column types ARRAY/MAP/STRUCT are not supported yet"
|
||||
+ " for text input format of Hive. ");
|
||||
+ "`, The column types ARRAY/MAP/STRUCT are not supported yet"
|
||||
+ " for text input format of Hive. ");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hmsTable.isHiveTransactionalTable()) {
|
||||
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
|
||||
ConnectContext.get().getQualifiedUser(), hmsTable);
|
||||
Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -159,9 +171,13 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
|
||||
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
|
||||
List<Split> allFiles, boolean useSelfSplitter) throws IOException {
|
||||
|
||||
for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
|
||||
cache.getFilesByPartitions(partitions, useSelfSplitter)) {
|
||||
List<FileCacheValue> fileCaches;
|
||||
if (hiveTransaction != null) {
|
||||
fileCaches = getFileSplitByTransaction(cache, partitions);
|
||||
} else {
|
||||
fileCaches = cache.getFilesByPartitions(partitions, useSelfSplitter);
|
||||
}
|
||||
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
|
||||
// This if branch is to support old splitter, will remove later.
|
||||
if (fileCacheValue.getSplits() != null) {
|
||||
allFiles.addAll(fileCacheValue.getSplits());
|
||||
@ -177,10 +193,23 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions) {
|
||||
for (HivePartition partition : partitions) {
|
||||
if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
|
||||
// this is unpartitioned table.
|
||||
continue;
|
||||
}
|
||||
hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns()));
|
||||
}
|
||||
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
|
||||
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
|
||||
return cache.getFilesByTransaction(partitions, validWriteIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getPathPartitionKeys() {
|
||||
return hmsTable.getRemoteTable().getPartitionKeys()
|
||||
.stream().map(FieldSchema::getName).collect(Collectors.toList());
|
||||
.stream().map(FieldSchema::getName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -151,6 +152,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
LOG.debug("not found query {} when unregisterQuery", DebugUtil.printId(queryId));
|
||||
}
|
||||
}
|
||||
|
||||
// commit hive tranaction if needed
|
||||
Env.getCurrentHiveTransactionMgr().deregister(DebugUtil.printId(queryId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user