[improvement](multi-catalog)(cache) invalidate catalog cache when refresh (#14342)
Invalidate catalog/db/table cache when doing refresh catalog/db/table. Tested table with 10000 partitions. The refresh operation will cost about 10-20 ms.
This commit is contained in:
@ -1,6 +1,6 @@
|
||||
---
|
||||
{
|
||||
"title": "REFRESH-CATALOG",
|
||||
"title": "REFRESH",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
@ -24,22 +24,28 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
## REFRESH-CATALOG
|
||||
## REFRESH
|
||||
|
||||
### Name
|
||||
|
||||
REFRESH CATALOG
|
||||
REFRESH
|
||||
|
||||
### Description
|
||||
|
||||
This statement is used to refresh the metadata of specific catalog.
|
||||
This statement refreshes the metadata of the specified Catalog/Database/Table.
|
||||
|
||||
Syntax:
|
||||
syntax:
|
||||
|
||||
```sql
|
||||
REFRESH CATALOG catalog_name
|
||||
REFRESH CATALOG catalog_name;
|
||||
REFRESH DATABASE [catalog_name.]database_name;
|
||||
REFRESH TABLE [catalog_name.][database_name.]table_name;
|
||||
```
|
||||
|
||||
When the Catalog is refreshed, the object-related Cache is forced to be invalidated.
|
||||
|
||||
Including Partition Cache, Schema Cache, File Cache, etc.
|
||||
|
||||
### Example
|
||||
|
||||
1. Refresh hive catalog
|
||||
@ -48,9 +54,24 @@ REFRESH CATALOG catalog_name
|
||||
REFRESH CATALOG hive;
|
||||
```
|
||||
|
||||
2. Refresh database1
|
||||
|
||||
```sql
|
||||
REFRESH DATABASE ctl.database1;
|
||||
REFRESH DATABASE database1;
|
||||
```
|
||||
|
||||
3. Refresh table1
|
||||
|
||||
```sql
|
||||
REFRESH TABLE ctl.db.table1;
|
||||
REFRESH TABLE db.table1;
|
||||
REFRESH TABLE table1;
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
REFRESH, CATALOG
|
||||
REFRESH, CATALOG, DATABASE, TABLE
|
||||
|
||||
### Best Practice
|
||||
|
||||
@ -913,7 +913,7 @@
|
||||
"sql-manual/sql-reference/Utility-Statements/USE",
|
||||
"sql-manual/sql-reference/Utility-Statements/DESCRIBE",
|
||||
"sql-manual/sql-reference/Utility-Statements/SWITCH",
|
||||
"sql-manual/sql-reference/Utility-Statements/REFRESH-CATALOG"
|
||||
"sql-manual/sql-reference/Utility-Statements/REFRESH"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
{
|
||||
"title": "REFRESH-CATALOG",
|
||||
"title": "REFRESH",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
@ -24,22 +24,28 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
## REFRESH-CATALOG
|
||||
## REFRESH
|
||||
|
||||
### Name
|
||||
|
||||
REFRESH CATALOG
|
||||
REFRESH
|
||||
|
||||
### Description
|
||||
|
||||
该语句用于刷新指定 Catalog 的元数据。
|
||||
该语句用于刷新指定 Catalog/Database/Table 的元数据。
|
||||
|
||||
语法:
|
||||
|
||||
```sql
|
||||
REFRESH CATALOG catalog_name
|
||||
REFRESH CATALOG catalog_name;
|
||||
REFRESH DATABASE [catalog_name.]database_name;
|
||||
REFRESH TABLE [catalog_name.][database_name.]table_name;
|
||||
```
|
||||
|
||||
刷新Catalog的同时,会强制使对象相关的 Cache 失效。
|
||||
|
||||
包括Partition Cache、Schema Cache、File Cache等。
|
||||
|
||||
### Example
|
||||
|
||||
1. 刷新 hive catalog
|
||||
@ -48,9 +54,24 @@ REFRESH CATALOG catalog_name
|
||||
REFRESH CATALOG hive;
|
||||
```
|
||||
|
||||
2. 刷新 database1
|
||||
|
||||
```sql
|
||||
REFRESH DATABASE ctl.database1;
|
||||
REFRESH DATABASE database1;
|
||||
```
|
||||
|
||||
3. 刷新 table1
|
||||
|
||||
```sql
|
||||
REFRESH TABLE ctl.db.table1;
|
||||
REFRESH TABLE db.table1;
|
||||
REFRESH TABLE table1;
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
REFRESH, CATALOG
|
||||
REFRESH, CATALOG, DATABASE, TABLE
|
||||
|
||||
### Best Practice
|
||||
|
||||
@ -159,7 +159,7 @@ public class RefreshManager {
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId(), dbName, tableName);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
log.setDbId(db.getId());
|
||||
|
||||
@ -95,6 +95,7 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
|
||||
|
||||
public void setUnInitialized() {
|
||||
this.initialized = false;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name);
|
||||
}
|
||||
|
||||
public boolean isInitialized() {
|
||||
|
||||
@ -439,8 +439,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
|
||||
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
|
||||
ExternalTable table = db.getTableForReplay(log.getTableId());
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog)
|
||||
.invalidateCache(db.getFullName(), table.getName());
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -130,6 +130,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
|
||||
public void setUninitialized() {
|
||||
this.initialized = false;
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
|
||||
}
|
||||
|
||||
public ExternalDatabase getDbForReplay(long dbId) {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
|
||||
@ -80,16 +81,41 @@ public class ExternalMetaCacheMgr {
|
||||
}
|
||||
}
|
||||
|
||||
public void removeCache(long catalogId, String dbName, String tblName) {
|
||||
public void invalidateTableCache(long catalogId, String dbName, String tblName) {
|
||||
dbName = ClusterNamespace.getNameFromFullName(dbName);
|
||||
ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
|
||||
if (schemaCache != null) {
|
||||
schemaCache.invalidateCache(dbName, tblName);
|
||||
LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, tblName, catalogId);
|
||||
schemaCache.invalidateTableCache(dbName, tblName);
|
||||
}
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.invalidateCache(dbName, tblName);
|
||||
LOG.debug("invalid meta cache for {}.{} in catalog {}", dbName, tblName, catalogId);
|
||||
metaCache.invalidateTableCache(dbName, tblName);
|
||||
}
|
||||
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
|
||||
}
|
||||
|
||||
public void invalidateDbCache(long catalogId, String dbName) {
|
||||
dbName = ClusterNamespace.getNameFromFullName(dbName);
|
||||
ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
|
||||
if (schemaCache != null) {
|
||||
schemaCache.invalidateDbCache(dbName);
|
||||
}
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.invalidateDbCache(dbName);
|
||||
}
|
||||
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
|
||||
}
|
||||
|
||||
public void invalidateCatalogCache(long catalogId) {
|
||||
ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
|
||||
if (schemaCache != null) {
|
||||
schemaCache.invalidateAll();
|
||||
}
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.invalidateAll();
|
||||
}
|
||||
LOG.debug("invalid catalog cache for {}", catalogId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -92,9 +93,27 @@ public class ExternalSchemaCache {
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateCache(String dbName, String tblName) {
|
||||
public void invalidateTableCache(String dbName, String tblName) {
|
||||
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
|
||||
schemaCache.invalidate(key);
|
||||
LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, tblName, catalog.getName());
|
||||
}
|
||||
|
||||
public void invalidateDbCache(String dbName) {
|
||||
long start = System.currentTimeMillis();
|
||||
Set<SchemaCacheKey> keys = schemaCache.asMap().keySet();
|
||||
for (SchemaCacheKey key : keys) {
|
||||
if (key.dbName.equals(dbName)) {
|
||||
schemaCache.invalidate(key);
|
||||
}
|
||||
}
|
||||
LOG.debug("invalid schema cache for db {} in catalog {} cost: {} ms", dbName, catalog.getName(),
|
||||
(System.currentTimeMillis() - start));
|
||||
}
|
||||
|
||||
public void invalidateAll() {
|
||||
schemaCache.invalidateAll();
|
||||
LOG.debug("invalid all schema cache in catalog {}", catalog.getName());
|
||||
}
|
||||
|
||||
@Data
|
||||
|
||||
@ -60,6 +60,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -327,10 +328,43 @@ public class HiveMetaStoreCache {
|
||||
return partitions;
|
||||
}
|
||||
|
||||
public void invalidateCache(String dbName, String tblName) {
|
||||
public void invalidateTableCache(String dbName, String tblName) {
|
||||
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
|
||||
partitionValuesCache.invalidate(key);
|
||||
// TODO: find a way to invalidate partitionCache and fileCache
|
||||
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
|
||||
if (partitionValues != null) {
|
||||
long start = System.currentTimeMillis();
|
||||
for (List<String> values : partitionValues.partitionValuesMap.values()) {
|
||||
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
|
||||
HivePartition partition = partitionCache.getIfPresent(partKey);
|
||||
if (partition != null) {
|
||||
fileCache.invalidate(new FileCacheKey(partition.getPath(), null));
|
||||
partitionCache.invalidate(partKey);
|
||||
}
|
||||
}
|
||||
partitionValuesCache.invalidate(key);
|
||||
LOG.debug("invalid table cache for {}.{} in catalog {}, cache num: {}, cost: {} ms",
|
||||
dbName, tblName, catalog.getName(), partitionValues.partitionValuesMap.size(),
|
||||
(System.currentTimeMillis() - start));
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateDbCache(String dbName) {
|
||||
long start = System.currentTimeMillis();
|
||||
Set<PartitionValueCacheKey> keys = partitionValuesCache.asMap().keySet();
|
||||
for (PartitionValueCacheKey key : keys) {
|
||||
if (key.dbName.equals(dbName)) {
|
||||
invalidateTableCache(dbName, key.tblName);
|
||||
}
|
||||
}
|
||||
LOG.debug("invalid db cache for {} in catalog {}, cache num: {}, cost: {} ms", dbName, catalog.getName(),
|
||||
keys.size(), (System.currentTimeMillis() - start));
|
||||
}
|
||||
|
||||
public void invalidateAll() {
|
||||
partitionValuesCache.invalidateAll();
|
||||
partitionCache.invalidateAll();
|
||||
fileCache.invalidateAll();
|
||||
LOG.debug("invalid all meta cache in catalog {}", catalog.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -444,7 +478,8 @@ public class HiveMetaStoreCache {
|
||||
|
||||
@Data
|
||||
public static class HivePartitionValues {
|
||||
private Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMap();
|
||||
private Map<Long, PartitionItem> idToPartitionItem;
|
||||
private Map<Long, List<String>> partitionValuesMap = Maps.newHashMap();
|
||||
private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
|
||||
private Map<Range<PartitionKey>, UniqueId> rangeToId;
|
||||
private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
|
||||
@ -454,6 +489,10 @@ public class HiveMetaStoreCache {
|
||||
Map<Range<PartitionKey>, UniqueId> rangeToId,
|
||||
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
|
||||
this.idToPartitionItem = idToPartitionItem;
|
||||
for (Map.Entry<Long, PartitionItem> entry : this.idToPartitionItem.entrySet()) {
|
||||
partitionValuesMap.put(entry.getKey(),
|
||||
((ListPartitionItem) entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
this.uidToPartitionRange = uidToPartitionRange;
|
||||
this.rangeToId = rangeToId;
|
||||
this.singleColumnRangeMap = singleColumnRangeMap;
|
||||
|
||||
Reference in New Issue
Block a user