[Enhancement](sql-cache) Add partition update time for hms table and use it at sql-cache. (#24491)
Now FE does not record the update time of hms tbl's partitons, so the sql cache may be hit even the hive table's partitions have changed. This pr add a field to record the partition update time, and use it when enable sql-cache. The cache will be missed if any partition has changed at hive side. Use System.currentTimeMillis() but not the event time of hms event because we would better keep the same measurement with the schemaUpdateTime of external table. Add this value to ExternalObjectLog and let slave FEs replay it because it is better to keep the same value with all FEs, so the sql-cache can be hit by the querys through different FEs.
This commit is contained in:
@ -75,8 +75,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
protected long timestamp;
|
||||
@SerializedName(value = "dbName")
|
||||
protected String dbName;
|
||||
@SerializedName(value = "lastUpdateTime")
|
||||
protected long lastUpdateTime;
|
||||
// this field will be refreshed after reloading schema
|
||||
protected volatile long schemaUpdateTime;
|
||||
|
||||
protected long dbId;
|
||||
protected boolean objectCreated;
|
||||
@ -296,9 +296,12 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// return schema update time as default
|
||||
// override this method if there is some other kinds of update time
|
||||
// use getSchemaUpdateTime if just need the schema update time
|
||||
@Override
|
||||
public long getUpdateTime() {
|
||||
return 0;
|
||||
return this.schemaUpdateTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -353,7 +356,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
* @return
|
||||
*/
|
||||
public List<Column> initSchemaAndUpdateTime() {
|
||||
lastUpdateTime = System.currentTimeMillis();
|
||||
schemaUpdateTime = System.currentTimeMillis();
|
||||
return initSchema();
|
||||
}
|
||||
|
||||
|
||||
@ -111,6 +111,9 @@ public class HMSExternalTable extends ExternalTable {
|
||||
// No as precise as row count in TableStats, but better than none.
|
||||
private long estimatedRowCount = -1;
|
||||
|
||||
// record the partition update time when enable hms event listener
|
||||
protected volatile long partitionUpdateTime;
|
||||
|
||||
public enum DLAType {
|
||||
UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE
|
||||
}
|
||||
@ -270,11 +273,6 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUpdateTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowCount() {
|
||||
makeSureInitialized();
|
||||
@ -630,6 +628,17 @@ public class HMSExternalTable extends ExternalTable {
|
||||
}
|
||||
}
|
||||
|
||||
public void setPartitionUpdateTime(long updateTime) {
|
||||
this.partitionUpdateTime = updateTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
// get the max value of `schemaUpdateTime` and `partitionUpdateTime`
|
||||
// partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled
|
||||
public long getUpdateTime() {
|
||||
return Math.max(this.schemaUpdateTime, this.partitionUpdateTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {
|
||||
super.gsonPostProcess();
|
||||
|
||||
@ -56,7 +56,7 @@ public class PaimonExternalTable extends ExternalTable {
|
||||
super.makeSureInitialized();
|
||||
if (!objectCreated) {
|
||||
originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
|
||||
lastUpdateTime = System.currentTimeMillis();
|
||||
schemaUpdateTime = System.currentTimeMillis();
|
||||
objectCreated = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@ public class TablesProcDir implements ProcDirInterface {
|
||||
tableInfo.add(FeConstants.null_string);
|
||||
tableInfo.add(replicaCount);
|
||||
}
|
||||
tableInfo.add(TimeUtils.longToTimeString(table.getLastUpdateTime()));
|
||||
tableInfo.add(TimeUtils.longToTimeString(table.getUpdateTime()));
|
||||
tableInfos.add(tableInfo);
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
|
||||
@ -31,6 +31,7 @@ import org.apache.doris.catalog.Resource.ReferenceType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.CaseSensibility;
|
||||
@ -928,14 +929,21 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(),
|
||||
(ExternalTable) table, partitionNames);
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) table;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
|
||||
long lastPartitionUpdateTime = System.currentTimeMillis();
|
||||
hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(lastPartitionUpdateTime);
|
||||
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
|
||||
}
|
||||
|
||||
@ -957,9 +965,16 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
|
||||
return;
|
||||
}
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) table;
|
||||
try {
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.addPartitionsCache(catalog.getId(), table, log.getPartitionNames());
|
||||
.addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
|
||||
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
} catch (HMSClientException e) {
|
||||
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
|
||||
@ -998,6 +1013,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(System.currentTimeMillis());
|
||||
replayDropExternalPartitions(log);
|
||||
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
|
||||
}
|
||||
@ -1020,8 +1036,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
|
||||
return;
|
||||
}
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) table;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.dropPartitionsCache(catalog.getId(), table, log.getPartitionNames());
|
||||
.dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
|
||||
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
}
|
||||
|
||||
public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
@ -1058,6 +1080,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(System.currentTimeMillis());
|
||||
replayRefreshExternalPartitions(log);
|
||||
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
|
||||
}
|
||||
@ -1080,9 +1103,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
|
||||
return;
|
||||
}
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
|
||||
log.getPartitionNames());
|
||||
((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
}
|
||||
|
||||
public void registerCatalogRefreshListener(Env env) {
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -159,25 +158,16 @@ public class ExternalMetaCacheMgr {
|
||||
LOG.debug("invalid catalog cache for {}", catalogId);
|
||||
}
|
||||
|
||||
public void addPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
public void addPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
|
||||
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames,
|
||||
((HMSExternalTable) table).getPartitionColumnTypes());
|
||||
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, table.getPartitionColumnTypes());
|
||||
}
|
||||
LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
|
||||
}
|
||||
|
||||
public void dropPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) {
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
LOG.warn("only support HMSTable");
|
||||
return;
|
||||
}
|
||||
public void dropPartitionsCache(long catalogId, HMSExternalTable table, List<String> partitionNames) {
|
||||
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
|
||||
@ -607,7 +607,7 @@ public class CacheAnalyzer {
|
||||
CacheTable cacheTable = new CacheTable();
|
||||
cacheTable.table = node.getTargetTable();
|
||||
cacheTable.partitionNum = node.getReadPartitionNum();
|
||||
cacheTable.latestTime = cacheTable.table.getLastUpdateTime();
|
||||
cacheTable.latestTime = cacheTable.table.getUpdateTime();
|
||||
return cacheTable;
|
||||
}
|
||||
|
||||
|
||||
@ -55,6 +55,10 @@ public class SqlCache extends Cache {
|
||||
return cacheKey;
|
||||
}
|
||||
|
||||
public long getLatestTime() {
|
||||
return latestTable.latestTime;
|
||||
}
|
||||
|
||||
public long getSumOfPartitionNum() {
|
||||
return latestTable.sumOfPartitionNum;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user