[Feature](multi-catalog) add refresh for catalog if it needs (#17884)

Currently if a catalog is modified externally in doris, doris is not dynamically aware of it.
So if a catalog is created with a refresh time configuration, I added a timer for it to refresh the catalog regularly.
This commit is contained in:
q763562998
2023-04-08 15:49:50 +08:00
committed by GitHub
parent 0b8bc51b72
commit 5aa58f5ce5
6 changed files with 211 additions and 0 deletions

View File

@ -1430,6 +1430,7 @@ public class Env {
}
// start mtmv jobManager
mtmvJobManager.start();
getRefreshManager().start();
}
// start threads that should running on all FE

View File

@ -19,25 +19,38 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.RefreshCatalogStmt;
import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.DdlExecutor;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Manager for refresh database and table action
public class RefreshManager {
private static final Logger LOG = LogManager.getLogger(RefreshManager.class);
private ScheduledThreadPoolExecutor refreshScheduler = ThreadPoolManager.newDaemonScheduledThreadPool(1,
"catalog-refresh-timer-pool", true);
// Unit:SECONDS
private static final int REFRESH_TIME_SEC = 5;
// key is the id of a catalog, value is an array of length 2, used to store
// the original refresh time and the current remaining time of the catalog
private Map<Long, Integer[]> refreshMap = Maps.newConcurrentMap();
public void handleRefreshTable(RefreshTableStmt stmt) throws UserException {
String catalogName = stmt.getCtl();
@ -146,4 +159,48 @@ public class RefreshManager {
stmt.getTableName(), "ICEBERG", icebergProperties, "");
env.createTable(createTableStmt);
}
public void addToRefreshMap(long catalogId, Integer[] sec) {
refreshMap.put(catalogId, sec);
}
public void removeFromRefreshMap(long catalogId) {
refreshMap.remove(catalogId);
}
public void start() {
RefreshTask refreshTask = new RefreshTask();
this.refreshScheduler.scheduleAtFixedRate(refreshTask, 0, REFRESH_TIME_SEC,
TimeUnit.SECONDS);
}
private class RefreshTask implements Runnable {
@Override
public void run() {
for (Map.Entry<Long, Integer[]> entry : refreshMap.entrySet()) {
Long catalogId = entry.getKey();
Integer[] timeGroup = entry.getValue();
Integer original = timeGroup[0];
Integer current = timeGroup[1];
if (current - REFRESH_TIME_SEC > 0) {
timeGroup[1] = current - REFRESH_TIME_SEC;
refreshMap.put(catalogId, timeGroup);
} else {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog != null) {
String catalogName = catalog.getName();
RefreshCatalogStmt refreshCatalogStmt = new RefreshCatalogStmt(catalogName, null);
try {
DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt);
} catch (Exception e) {
LOG.warn("failed to refresh catalog {}", catalogName, e);
}
// reset
timeGroup[1] = original;
refreshMap.put(catalogId, timeGroup);
}
}
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@ -140,5 +141,6 @@ public interface CatalogIf<T extends DatabaseIf> {
// Called when catalog is dropped
default void onClose() {
Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(getId());
}
}

View File

@ -78,6 +78,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
public static final String ACCESS_CONTROLLER_CLASS_PROP = "access_controller.class";
public static final String ACCESS_CONTROLLER_PROPERTY_PREFIX_PROP = "access_controller.properties.";
public static final String METADATA_REFRESH_INTERVAL_SEC = "metadata_refresh_interval_sec";
public static final String CATALOG_TYPE_PROP = "type";
private static final String YES = "yes";
@ -468,6 +469,14 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
if (!isReplay && catalog instanceof ExternalCatalog) {
((ExternalCatalog) catalog).checkProperties();
}
Map<String, String> props = log.getProps();
if (props.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
// need refresh
long catalogId = log.getCatalogId();
Integer metadataRefreshIntervalSec = Integer.valueOf(props.get(METADATA_REFRESH_INTERVAL_SEC));
Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalogId, sec);
}
addCatalog(catalog);
return catalog;
} finally {
@ -964,6 +973,12 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
public void gsonPostProcess() throws IOException {
for (CatalogIf catalog : idToCatalog.values()) {
nameToCatalog.put(catalog.getName(), catalog);
Map properties = catalog.getProperties();
if (properties.containsKey(METADATA_REFRESH_INTERVAL_SEC)) {
Integer metadataRefreshIntervalSec = (Integer) properties.get(METADATA_REFRESH_INTERVAL_SEC);
Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec};
Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalog.getId(), sec);
}
}
internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID);
}

View File

@ -156,6 +156,15 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
// check if all required properties are set when creating catalog
public void checkProperties() throws DdlException {
// check refresh parameter of catalog
Map<String, String> properties = getCatalogProperty().getProperties();
if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) {
try {
Integer.valueOf(properties.get(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC));
} catch (NumberFormatException e) {
throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC);
}
}
}
/**

View File

@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.DropCatalogStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.test.TestExternalCatalog;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
public class RefreshCatalogTest extends TestWithFeService {
private static Auth auth;
private static Env env;
private CatalogMgr mgr;
private ConnectContext rootCtx;
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
mgr = Env.getCurrentEnv().getCatalogMgr();
rootCtx = createDefaultCtx();
env = Env.getCurrentEnv();
auth = env.getAuth();
// 1. create test catalog
CreateCatalogStmt testCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt(
"create catalog test1 properties(\n"
+ " \"type\" = \"test\",\n"
+ " \"metadata_refresh_interval_sec\" = \"1\",\n"
+ " \"catalog_provider.class\" "
+ "= \"org.apache.doris.datasource.RefreshCatalogTest$RefreshCatalogProvider\"\n"
+ ");",
rootCtx);
env.getCatalogMgr().createCatalog(testCatalog);
}
@Override
protected void runAfterAll() throws Exception {
super.runAfterAll();
rootCtx.setThreadLocalInfo();
DropCatalogStmt stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test1");
env.getCatalogMgr().dropCatalog(stmt);
}
@Test
public void testRefreshCatalog() throws Exception {
CatalogIf test1 = env.getCatalogMgr().getCatalog("test1");
List<String> dbNames1 = test1.getDbNames();
// there are test1.db1 , test1.db2
Assertions.assertEquals(2, dbNames1.size());
// 1.simulate ExternalCatalog adds a new table
RefreshCatalogProvider.addData();
// 2.wait for the refresh time of the catalog
Thread.sleep(5000);
// there are test1.db1 , test1.db2 , test1.db3
List<String> dbNames2 = test1.getDbNames();
Assertions.assertEquals(3, dbNames2.size());
}
public static class RefreshCatalogProvider implements TestExternalCatalog.TestCatalogProvider {
public static final Map<String, Map<String, List<Column>>> MOCKED_META;
static {
MOCKED_META = Maps.newHashMap();
Map<String, List<Column>> tblSchemaMap1 = Maps.newHashMap();
// db1
tblSchemaMap1.put("tbl11", Lists.newArrayList(
new Column("a11", PrimitiveType.BIGINT),
new Column("a12", PrimitiveType.STRING),
new Column("a13", PrimitiveType.FLOAT)));
tblSchemaMap1.put("tbl12", Lists.newArrayList(
new Column("b21", PrimitiveType.BIGINT),
new Column("b22", PrimitiveType.STRING),
new Column("b23", PrimitiveType.FLOAT)));
MOCKED_META.put("db1", tblSchemaMap1);
// db2
Map<String, List<Column>> tblSchemaMap2 = Maps.newHashMap();
tblSchemaMap2.put("tbl21", Lists.newArrayList(
new Column("c11", PrimitiveType.BIGINT),
new Column("c12", PrimitiveType.STRING),
new Column("c13", PrimitiveType.FLOAT)));
MOCKED_META.put("db2", tblSchemaMap2);
}
@Override
public Map<String, Map<String, List<Column>>> getMetadata() {
return MOCKED_META;
}
public static void addData() {
// db3
Map<String, List<Column>> tblSchemaMap3 = Maps.newHashMap();
tblSchemaMap3.put("tbl31", Lists.newArrayList(
new Column("c11", PrimitiveType.BIGINT),
new Column("c12", PrimitiveType.STRING),
new Column("c13", PrimitiveType.FLOAT)));
MOCKED_META.put("db3", tblSchemaMap3);
}
}
}