[Fix](multi-catalog) fix FE abnormal exit when replay OP_REFRESH_EXTERNAL_TABLE (#19120)
When salve FE nodes replay OP_REFRESH_EXTERNAL_TABLE log, it will invoke `org.apache.doris.datasource.hive.HiveMetaStoreCache#invalidateTableCache`, but if the table is a non-partitioned table, it will invoke `catalog.getClient().getTable`. If some network problem occurs or this table is not existed, an exception will be thrown and FE will exit right away. The solution is that we can use a dummy key as the file cache key which only contains db name and table name. And when slave FE nodes replay OP_REFRESH_EXTERNAL_TABLE log, it will not rely on the hms client and there will not any exception occurs.
This commit is contained in:
@ -59,7 +59,6 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
@ -261,7 +260,7 @@ public class HiveMetaStoreCache {
|
||||
sd.getInputFormat(), sd.getLocation(), key, catalog.getName());
|
||||
}
|
||||
// TODO: more info?
|
||||
return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
}
|
||||
|
||||
private FileCacheValue loadFiles(FileCacheKey key) {
|
||||
@ -360,7 +359,10 @@ public class HiveMetaStoreCache {
|
||||
long start = System.currentTimeMillis();
|
||||
List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
|
||||
partitions.stream().forEach(p -> {
|
||||
FileCacheKey fileCacheKey = new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
|
||||
FileCacheKey fileCacheKey = p.isDummyPartition()
|
||||
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
|
||||
p.getInputFormat(), useSelfSplitter)
|
||||
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
|
||||
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
|
||||
keys.add(fileCacheKey);
|
||||
});
|
||||
@ -438,12 +440,13 @@ public class HiveMetaStoreCache {
|
||||
* A file cache entry can be created reference to
|
||||
* {@link org.apache.doris.planner.external.HiveSplitter#getSplits},
|
||||
* so we need to invalidate it if this is a non-partitioned table.
|
||||
*
|
||||
* We use {@link org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey#createDummyCacheKey}
|
||||
* to avoid invocation by Hms Client, because this method may be invoked when salve FE replay journal logs,
|
||||
* and FE will exit if some network problems occur.
|
||||
* */
|
||||
Table table = catalog.getClient().getTable(dbName, tblName);
|
||||
// we just need to assign the `location` filed because the `equals` method of `FileCacheKey`
|
||||
// just compares the value of `location`
|
||||
fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null, null));
|
||||
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
|
||||
dbName, tblName, null, null, false);
|
||||
fileCacheRef.get().invalidate(fileCacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
@ -699,6 +702,7 @@ public class HiveMetaStoreCache {
|
||||
|
||||
@Data
|
||||
public static class FileCacheKey {
|
||||
private String dummyKey;
|
||||
private String location;
|
||||
// not in key
|
||||
private String inputFormat;
|
||||
@ -717,6 +721,14 @@ public class HiveMetaStoreCache {
|
||||
this.useSelfSplitter = true;
|
||||
}
|
||||
|
||||
public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
|
||||
String inputFormat, boolean useSelfSplitter) {
|
||||
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null);
|
||||
fileCacheKey.dummyKey = dbName + "." + tblName;
|
||||
fileCacheKey.useSelfSplitter = useSelfSplitter;
|
||||
return fileCacheKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
@ -725,12 +737,18 @@ public class HiveMetaStoreCache {
|
||||
if (!(obj instanceof FileCacheKey)) {
|
||||
return false;
|
||||
}
|
||||
if (dummyKey != null) {
|
||||
return dummyKey.equals(((FileCacheKey) obj).dummyKey);
|
||||
}
|
||||
return location.equals(((FileCacheKey) obj).location)
|
||||
&& partitionValues.equals(((FileCacheKey) obj).partitionValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (dummyKey != null) {
|
||||
return Objects.hash(dummyKey);
|
||||
}
|
||||
return Objects.hash(location, partitionValues);
|
||||
}
|
||||
|
||||
|
||||
@ -23,11 +23,18 @@ import java.util.List;
|
||||
|
||||
@Data
|
||||
public class HivePartition {
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
private String inputFormat;
|
||||
private String path;
|
||||
private List<String> partitionValues;
|
||||
private boolean isDummyPartition;
|
||||
|
||||
public HivePartition(String inputFormat, String path, List<String> partitionValues) {
|
||||
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
|
||||
String inputFormat, String path, List<String> partitionValues) {
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
this.isDummyPartition = isDummyPartition;
|
||||
// eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
|
||||
this.inputFormat = inputFormat;
|
||||
// eg: hdfs://hk-dev01:8121/user/doris/parquet/partition_table/nation=cn/city=beijing
|
||||
@ -36,10 +43,17 @@ public class HivePartition {
|
||||
this.partitionValues = partitionValues;
|
||||
}
|
||||
|
||||
public boolean isDummyPartition() {
|
||||
return this.isDummyPartition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HivePartition{"
|
||||
+ "inputFormat='" + inputFormat + '\''
|
||||
+ "dbName='" + dbName + '\''
|
||||
+ ", tblName='" + tblName + '\''
|
||||
+ ", isDummyPartition='" + isDummyPartition + '\''
|
||||
+ ", inputFormat='" + inputFormat + '\''
|
||||
+ ", path='" + path + '\''
|
||||
+ ", partitionValues=" + partitionValues + '}';
|
||||
}
|
||||
|
||||
@ -115,7 +115,8 @@ public class HiveSplitter implements Splitter {
|
||||
} else {
|
||||
// unpartitioned table, create a dummy partition to save location and inputformat,
|
||||
// so that we can unify the interface.
|
||||
HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
|
||||
hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null);
|
||||
getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles, useSelfSplitter);
|
||||
this.totalPartitionNum = 1;
|
||||
|
||||
Reference in New Issue
Block a user