[Enhancement](sql-cache) Use update time of hive to avoid cache miss through multi fe nodes. (#26424)
Now the update time of hms table is generated by every FE node (Use `System.currentTimestamp()` separately), so the update time of a hms table may be different between FE nodes, always the same query can not hit the sql-cache if we submit it more than one times through different FE nodes. This pr mainly do following changes to avoid this problem. - Use the `transient_lastDdlTime` instead of `System.currentTimestamp` as the `schemaUpdateTime` of hms tables - Use the `eventTime` in hms event instead of `System.currentTimestamp` as the update time when processing hms events
This commit is contained in:
@ -41,6 +41,7 @@ import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
@ -82,6 +83,9 @@ public class HMSExternalTable extends ExternalTable {
|
||||
|
||||
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
|
||||
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
|
||||
|
||||
private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime";
|
||||
|
||||
private static final String NUM_ROWS = "numRows";
|
||||
|
||||
static {
|
||||
@ -112,8 +116,8 @@ public class HMSExternalTable extends ExternalTable {
|
||||
// No as precise as row count in TableStats, but better than none.
|
||||
private long estimatedRowCount = -1;
|
||||
|
||||
// record the partition update time when enable hms event listener
|
||||
protected volatile long partitionUpdateTime;
|
||||
// record the event update time when enable hms event listener
|
||||
protected volatile long eventUpdateTime;
|
||||
|
||||
public enum DLAType {
|
||||
UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE
|
||||
@ -405,6 +409,20 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return new HashSet<>(names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Column> initSchemaAndUpdateTime() {
|
||||
org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient()
|
||||
.getTable(dbName, name);
|
||||
// try to use transient_lastDdlTime from hms client
|
||||
schemaUpdateTime = MapUtils.isNotEmpty(table.getParameters())
|
||||
&& table.getParameters().containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)
|
||||
? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000
|
||||
// use current timestamp if lastDdlTime does not exist (hive views don't have this prop)
|
||||
: System.currentTimeMillis();
|
||||
return initSchema();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Column> initSchema() {
|
||||
makeSureInitialized();
|
||||
@ -635,15 +653,15 @@ public class HMSExternalTable extends ExternalTable {
|
||||
}
|
||||
}
|
||||
|
||||
public void setPartitionUpdateTime(long updateTime) {
|
||||
this.partitionUpdateTime = updateTime;
|
||||
public void setEventUpdateTime(long updateTime) {
|
||||
this.eventUpdateTime = updateTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
// get the max value of `schemaUpdateTime` and `partitionUpdateTime`
|
||||
// partitionUpdateTime will be refreshed after processing partition events with hms event listener enabled
|
||||
// get the max value of `schemaUpdateTime` and `eventUpdateTime`
|
||||
// eventUpdateTime will be refreshed after processing events with hms event listener enabled
|
||||
public long getUpdateTime() {
|
||||
return Math.max(this.schemaUpdateTime, this.partitionUpdateTime);
|
||||
return Math.max(this.schemaUpdateTime, this.eventUpdateTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -650,6 +650,44 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshExternalTableFromEvent(String dbName, String tableName, String catalogName,
|
||||
long updateTime, boolean ignoreIfNotExists) throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + catalogName);
|
||||
}
|
||||
if (!(catalog instanceof ExternalCatalog)) {
|
||||
throw new DdlException("Only support refresh ExternalCatalog Tables");
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
if (!ignoreIfNotExists) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!(table instanceof HMSExternalTable)) {
|
||||
return;
|
||||
}
|
||||
((HMSExternalTable) table).unsetObjectCreated();
|
||||
((HMSExternalTable) table).setEventUpdateTime(updateTime);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setLastUpdateTime(updateTime);
|
||||
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
|
||||
}
|
||||
|
||||
public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
@ -704,6 +742,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
table.unsetObjectCreated();
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
if (table instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
|
||||
((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
|
||||
}
|
||||
}
|
||||
|
||||
public void dropExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
|
||||
@ -923,8 +964,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
|
||||
boolean ignoreIfNotExists)
|
||||
public void addExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
@ -955,14 +996,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) table;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
|
||||
long lastPartitionUpdateTime = System.currentTimeMillis();
|
||||
hmsTable.setPartitionUpdateTime(lastPartitionUpdateTime);
|
||||
hmsTable.setEventUpdateTime(updateTime);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(lastPartitionUpdateTime);
|
||||
log.setLastUpdateTime(updateTime);
|
||||
Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
|
||||
}
|
||||
|
||||
@ -993,7 +1033,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
try {
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
|
||||
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
|
||||
} catch (HMSClientException e) {
|
||||
LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
|
||||
@ -1001,8 +1041,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames,
|
||||
boolean ignoreIfNotExists)
|
||||
public void dropExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
@ -1032,7 +1072,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(System.currentTimeMillis());
|
||||
log.setLastUpdateTime(updateTime);
|
||||
replayDropExternalPartitions(log);
|
||||
Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
|
||||
}
|
||||
@ -1062,11 +1102,11 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) table;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames());
|
||||
hmsTable.setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
hmsTable.setEventUpdateTime(log.getLastUpdateTime());
|
||||
}
|
||||
|
||||
public void refreshExternalPartitions(String catalogName, String dbName, String tableName,
|
||||
List<String> partitionNames, boolean ignoreIfNotExists)
|
||||
List<String> partitionNames, long updateTime, boolean ignoreIfNotExists)
|
||||
throws DdlException {
|
||||
CatalogIf catalog = nameToCatalog.get(catalogName);
|
||||
if (catalog == null) {
|
||||
@ -1099,7 +1139,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
log.setDbId(db.getId());
|
||||
log.setTableId(table.getId());
|
||||
log.setPartitionNames(partitionNames);
|
||||
log.setLastUpdateTime(System.currentTimeMillis());
|
||||
log.setLastUpdateTime(updateTime);
|
||||
replayRefreshExternalPartitions(log);
|
||||
Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
|
||||
}
|
||||
@ -1129,7 +1169,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(),
|
||||
log.getPartitionNames());
|
||||
((HMSExternalTable) table).setPartitionUpdateTime(log.getLastUpdateTime());
|
||||
((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
|
||||
}
|
||||
|
||||
public void registerCatalogRefreshListener(Env env) {
|
||||
|
||||
@ -103,7 +103,7 @@ public class AddPartitionEvent extends MetastorePartitionEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
|
||||
.addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, eventTime, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -114,14 +114,14 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {
|
||||
if (isRename) {
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalPartitions(catalogName, dbName, tblName,
|
||||
Lists.newArrayList(partitionNameBefore), true);
|
||||
Lists.newArrayList(partitionNameBefore), eventTime, true);
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.addExternalPartitions(catalogName, dbName, tblName,
|
||||
Lists.newArrayList(partitionNameAfter), true);
|
||||
Lists.newArrayList(partitionNameAfter), eventTime, true);
|
||||
} else {
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
|
||||
Lists.newArrayList(partitionNameAfter), true);
|
||||
Lists.newArrayList(partitionNameAfter), eventTime, true);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
|
||||
@ -154,7 +154,8 @@ public class AlterTableEvent extends MetastoreTableEvent {
|
||||
}
|
||||
//The scope of refresh can be narrowed in the future
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
|
||||
.refreshExternalTableFromEvent(tableBefore.getDbName(), tableBefore.getTableName(),
|
||||
catalogName, eventTime, true);
|
||||
} catch (Exception e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -103,7 +103,8 @@ public class DropPartitionEvent extends MetastorePartitionEvent {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames, true);
|
||||
.dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(),
|
||||
partitionNames, eventTime, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -83,7 +83,8 @@ public class InsertEvent extends MetastoreTableEvent {
|
||||
* the file cache of this table,
|
||||
* but <a href="https://github.com/apache/doris/pull/17932">this PR</a> has fixed it.
|
||||
*/
|
||||
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(dbName, tblName, catalogName, true);
|
||||
Env.getCurrentEnv().getCatalogMgr().refreshExternalTableFromEvent(dbName, tblName,
|
||||
catalogName, eventTime, true);
|
||||
} catch (DdlException e) {
|
||||
throw new MetastoreNotificationException(
|
||||
debugString("Failed to process event"), e);
|
||||
|
||||
@ -47,10 +47,13 @@ public abstract class MetastoreEvent {
|
||||
protected final String tblName;
|
||||
|
||||
// eventId of the event. Used instead of calling getter on event everytime
|
||||
private final long eventId;
|
||||
protected final long eventId;
|
||||
|
||||
// eventTime of the event. Used instead of calling getter on event everytime
|
||||
protected final long eventTime;
|
||||
|
||||
// eventType from the NotificationEvent
|
||||
private final MetastoreEventType eventType;
|
||||
protected final MetastoreEventType eventType;
|
||||
|
||||
// Actual notificationEvent object received from Metastore
|
||||
protected final NotificationEvent metastoreNotificationEvent;
|
||||
@ -61,6 +64,7 @@ public abstract class MetastoreEvent {
|
||||
protected MetastoreEvent(long eventId, String catalogName, String dbName,
|
||||
String tblName, MetastoreEventType eventType) {
|
||||
this.eventId = eventId;
|
||||
this.eventTime = -1L;
|
||||
this.catalogName = catalogName;
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
@ -74,6 +78,7 @@ public abstract class MetastoreEvent {
|
||||
this.dbName = event.getDbName();
|
||||
this.tblName = event.getTableName();
|
||||
this.eventId = event.getEventId();
|
||||
this.eventTime = event.getEventTime() * 1000L;
|
||||
this.eventType = MetastoreEventType.from(event.getEventType());
|
||||
this.metastoreNotificationEvent = event;
|
||||
this.catalogName = catalogName;
|
||||
@ -163,8 +168,8 @@ public abstract class MetastoreEvent {
|
||||
*/
|
||||
private Object[] getLogFormatArgs(Object[] args) {
|
||||
Object[] formatArgs = new Object[args.length + 2];
|
||||
formatArgs[0] = getEventId();
|
||||
formatArgs[1] = getEventType();
|
||||
formatArgs[0] = eventId;
|
||||
formatArgs[1] = eventType;
|
||||
int i = 2;
|
||||
for (Object arg : args) {
|
||||
formatArgs[i] = arg;
|
||||
|
||||
@ -140,9 +140,8 @@ public class MetastoreEventsProcessor extends MasterDaemon {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
if (catalog instanceof HMSExternalCatalog) {
|
||||
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
|
||||
List<NotificationEvent> events = Collections.emptyList();
|
||||
try {
|
||||
events = getNextHMSEvents(hmsExternalCatalog);
|
||||
List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog);
|
||||
if (!events.isEmpty()) {
|
||||
LOG.info("Events size are {} on catalog [{}]", events.size(),
|
||||
hmsExternalCatalog.getName());
|
||||
|
||||
@ -120,6 +120,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
|
||||
|
||||
Deencapsulation.setField(tbl, "objectCreated", true);
|
||||
Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true));
|
||||
Deencapsulation.setField(tbl, "schemaUpdateTime", NOW);
|
||||
Deencapsulation.setField(tbl, "eventUpdateTime", 0);
|
||||
new Expectations(tbl) {
|
||||
{
|
||||
tbl.getId();
|
||||
@ -154,15 +156,16 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
|
||||
minTimes = 0;
|
||||
result = DLAType.HIVE;
|
||||
|
||||
tbl.getUpdateTime();
|
||||
// mock initSchemaAndUpdateTime and do nothing
|
||||
tbl.initSchemaAndUpdateTime();
|
||||
minTimes = 0;
|
||||
result = NOW;
|
||||
}
|
||||
};
|
||||
|
||||
Deencapsulation.setField(tbl2, "objectCreated", true);
|
||||
Deencapsulation.setField(tbl2, "rwLock", new ReentrantReadWriteLock(true));
|
||||
|
||||
Deencapsulation.setField(tbl2, "schemaUpdateTime", NOW);
|
||||
Deencapsulation.setField(tbl2, "eventUpdateTime", 0);
|
||||
new Expectations(tbl2) {
|
||||
{
|
||||
tbl2.getId();
|
||||
@ -197,8 +200,8 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
|
||||
minTimes = 0;
|
||||
result = DLAType.HIVE;
|
||||
|
||||
// mock init schema and do nothing
|
||||
tbl2.initSchema();
|
||||
// mock initSchemaAndUpdateTime and do nothing
|
||||
tbl2.initSchemaAndUpdateTime();
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
@ -383,7 +386,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
|
||||
// do nothing
|
||||
}
|
||||
long later = System.currentTimeMillis();
|
||||
tbl2.setPartitionUpdateTime(later);
|
||||
tbl2.setEventUpdateTime(later);
|
||||
|
||||
// check cache mode again
|
||||
ca.checkCacheMode(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2);
|
||||
@ -431,7 +434,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
|
||||
// do nothing
|
||||
}
|
||||
long later = System.currentTimeMillis();
|
||||
tbl2.setPartitionUpdateTime(later);
|
||||
tbl2.setEventUpdateTime(later);
|
||||
|
||||
// check cache mode again
|
||||
ca.checkCacheModeForNereids(System.currentTimeMillis() + Config.cache_last_version_interval_second * 1000L * 2);
|
||||
|
||||
Reference in New Issue
Block a user