diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 579bc51677..d753af373a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1430,6 +1430,7 @@ public class Env { } // start mtmv jobManager mtmvJobManager.start(); + getRefreshManager().start(); } // start threads that should running on all FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index ef95b3e5f8..62a692c6ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -19,25 +19,38 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.RefreshDbStmt; import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.qe.DdlExecutor; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; // Manager for refresh database and table action public class RefreshManager { private static final Logger LOG = LogManager.getLogger(RefreshManager.class); + private ScheduledThreadPoolExecutor refreshScheduler = ThreadPoolManager.newDaemonScheduledThreadPool(1, + "catalog-refresh-timer-pool", true); + // Unit:SECONDS + private static final int REFRESH_TIME_SEC = 5; + // key is the id of a catalog, value is an array of length 2, used to store + // the original refresh time and the current remaining time of the catalog + private Map refreshMap = Maps.newConcurrentMap(); public void handleRefreshTable(RefreshTableStmt stmt) throws UserException { String catalogName = stmt.getCtl(); @@ -146,4 +159,48 @@ public class RefreshManager { stmt.getTableName(), "ICEBERG", icebergProperties, ""); env.createTable(createTableStmt); } + + public void addToRefreshMap(long catalogId, Integer[] sec) { + refreshMap.put(catalogId, sec); + } + + public void removeFromRefreshMap(long catalogId) { + refreshMap.remove(catalogId); + } + + public void start() { + RefreshTask refreshTask = new RefreshTask(); + this.refreshScheduler.scheduleAtFixedRate(refreshTask, 0, REFRESH_TIME_SEC, + TimeUnit.SECONDS); + } + + private class RefreshTask implements Runnable { + @Override + public void run() { + for (Map.Entry entry : refreshMap.entrySet()) { + Long catalogId = entry.getKey(); + Integer[] timeGroup = entry.getValue(); + Integer original = timeGroup[0]; + Integer current = timeGroup[1]; + if (current - REFRESH_TIME_SEC > 0) { + timeGroup[1] = current - REFRESH_TIME_SEC; + refreshMap.put(catalogId, timeGroup); + } else { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (catalog != null) { + String catalogName = catalog.getName(); + RefreshCatalogStmt refreshCatalogStmt = new RefreshCatalogStmt(catalogName, null); + try { + DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt); + } catch (Exception e) { + LOG.warn("failed to refresh catalog {}", catalogName, e); + } + // reset + timeGroup[1] = original; + refreshMap.put(catalogId, timeGroup); + } + } + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index 2dbe6dbe3c..7b9ea7468a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -140,5 +141,6 @@ public interface CatalogIf { // Called when catalog is dropped default void onClose() { + Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(getId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 7b3ec75940..a116f0a64e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -78,6 +78,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { public static final String ACCESS_CONTROLLER_CLASS_PROP = "access_controller.class"; public static final String ACCESS_CONTROLLER_PROPERTY_PREFIX_PROP = "access_controller.properties."; + public static final String METADATA_REFRESH_INTERVAL_SEC = "metadata_refresh_interval_sec"; public static final String CATALOG_TYPE_PROP = "type"; private static final String YES = "yes"; @@ -468,6 +469,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable { if (!isReplay && catalog instanceof ExternalCatalog) { ((ExternalCatalog) catalog).checkProperties(); } + Map props = log.getProps(); + if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) { + // need refresh + long catalogId = log.getCatalogId(); + Integer metadataRefreshIntervalSec = Integer.valueOf(props.get(METADATA_REFRESH_INTERVAL_SEC)); + Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; + Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalogId, sec); + } addCatalog(catalog); return catalog; } finally { @@ -964,6 +973,12 @@ public class CatalogMgr implements Writable, GsonPostProcessable { public void gsonPostProcess() throws IOException { for (CatalogIf catalog : idToCatalog.values()) { nameToCatalog.put(catalog.getName(), catalog); + Map properties = catalog.getProperties(); + if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) { + Integer metadataRefreshIntervalSec = (Integer) properties.get(METADATA_REFRESH_INTERVAL_SEC); + Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; + Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalog.getId(), sec); + } } internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 1fdb374ef9..ed900fde02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -156,6 +156,15 @@ public abstract class ExternalCatalog implements CatalogIf, Wr // check if all required properties are set when creating catalog public void checkProperties() throws DdlException { + // check refresh parameter of catalog + Map properties = getCatalogProperty().getProperties(); + if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) { + try { + Integer.valueOf(properties.get(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)); + } catch (NumberFormatException e) { + throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC); + } + } } /** diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java new file mode 100644 index 0000000000..8b64df074e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.analysis.CreateCatalogStmt; +import org.apache.doris.analysis.DropCatalogStmt; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.test.TestExternalCatalog; +import org.apache.doris.mysql.privilege.Auth; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +public class RefreshCatalogTest extends TestWithFeService { + private static Auth auth; + private static Env env; + private CatalogMgr mgr; + private ConnectContext rootCtx; + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + mgr = Env.getCurrentEnv().getCatalogMgr(); + rootCtx = createDefaultCtx(); + env = Env.getCurrentEnv(); + auth = env.getAuth(); + // 1. create test catalog + CreateCatalogStmt testCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt( + "create catalog test1 properties(\n" + + " \"type\" = \"test\",\n" + + " \"metadata_refresh_interval_sec\" = \"1\",\n" + + " \"catalog_provider.class\" " + + "= \"org.apache.doris.datasource.RefreshCatalogTest$RefreshCatalogProvider\"\n" + + ");", + rootCtx); + env.getCatalogMgr().createCatalog(testCatalog); + } + + @Override + protected void runAfterAll() throws Exception { + super.runAfterAll(); + rootCtx.setThreadLocalInfo(); + DropCatalogStmt stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test1"); + env.getCatalogMgr().dropCatalog(stmt); + } + + @Test + public void testRefreshCatalog() throws Exception { + CatalogIf test1 = env.getCatalogMgr().getCatalog("test1"); + List dbNames1 = test1.getDbNames(); + // there are test1.db1 , test1.db2 + Assertions.assertEquals(2, dbNames1.size()); + // 1.simulate ExternalCatalog adds a new table + RefreshCatalogProvider.addData(); + // 2.wait for the refresh time of the catalog + Thread.sleep(5000); + // there are test1.db1 , test1.db2 , test1.db3 + List dbNames2 = test1.getDbNames(); + Assertions.assertEquals(3, dbNames2.size()); + } + + public static class RefreshCatalogProvider implements TestExternalCatalog.TestCatalogProvider { + public static final Map>> MOCKED_META; + + static { + MOCKED_META = Maps.newHashMap(); + Map> tblSchemaMap1 = Maps.newHashMap(); + // db1 + tblSchemaMap1.put("tbl11", Lists.newArrayList( + new Column("a11", PrimitiveType.BIGINT), + new Column("a12", PrimitiveType.STRING), + new Column("a13", PrimitiveType.FLOAT))); + tblSchemaMap1.put("tbl12", Lists.newArrayList( + new Column("b21", PrimitiveType.BIGINT), + new Column("b22", PrimitiveType.STRING), + new Column("b23", PrimitiveType.FLOAT))); + MOCKED_META.put("db1", tblSchemaMap1); + // db2 + Map> tblSchemaMap2 = Maps.newHashMap(); + tblSchemaMap2.put("tbl21", Lists.newArrayList( + new Column("c11", PrimitiveType.BIGINT), + new Column("c12", PrimitiveType.STRING), + new Column("c13", PrimitiveType.FLOAT))); + MOCKED_META.put("db2", tblSchemaMap2); + } + + @Override + public Map>> getMetadata() { + return MOCKED_META; + } + + public static void addData() { + // db3 + Map> tblSchemaMap3 = Maps.newHashMap(); + tblSchemaMap3.put("tbl31", Lists.newArrayList( + new Column("c11", PrimitiveType.BIGINT), + new Column("c12", PrimitiveType.STRING), + new Column("c13", PrimitiveType.FLOAT))); + MOCKED_META.put("db3", tblSchemaMap3); + } + } +}