[Fix](multi catalog)Refresh table object while refresh external table. (#18592)
Refresh table object while refresh external table. Including: Refresh catalog, refresh database and refresh table. Before visiting database, need to guarantee catalog has been initialized. Before visiting table, need to guarantee catalog and database have been initialized.
This commit is contained in:
@ -99,6 +99,7 @@ public class EsExternalDatabase extends ExternalDatabase<EsExternalTable> implem
|
||||
tblId = tableNameToId.get(tableName);
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
EsExternalTable table = idToTbl.get(tblId);
|
||||
table.unsetObjectCreated();
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addRefreshTable(tblId);
|
||||
} else {
|
||||
|
||||
@ -52,6 +52,7 @@ public class EsExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
protected synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
if (!objectCreated) {
|
||||
esTable = toEsTable();
|
||||
objectCreated = true;
|
||||
|
||||
@ -107,6 +107,7 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
|
||||
}
|
||||
|
||||
public final synchronized void makeSureInitialized() {
|
||||
extCatalog.makeSureInitialized();
|
||||
if (!initialized) {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
// Forward to master and wait the journal to replay.
|
||||
|
||||
@ -21,10 +21,12 @@ import org.apache.doris.alter.AlterCancelException;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalSchemaCache;
|
||||
import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
@ -103,7 +105,13 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
}
|
||||
|
||||
protected void makeSureInitialized() {
|
||||
throw new NotImplementedException();
|
||||
try {
|
||||
// getDbOrAnalysisException will call makeSureInitialized in ExternalCatalog.
|
||||
ExternalDatabase db = catalog.getDbOrAnalysisException(dbName);
|
||||
db.makeSureInitialized();
|
||||
} catch (AnalysisException e) {
|
||||
Util.logAndThrowRuntimeException(LOG, String.format("Exception to get db %s", dbName), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -320,6 +328,10 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
throw new NotImplementedException("implement in sub class");
|
||||
}
|
||||
|
||||
public void unsetObjectCreated() {
|
||||
this.objectCreated = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
|
||||
@ -102,6 +102,7 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
|
||||
tblId = tableNameToId.get(tableName);
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
HMSExternalTable table = idToTbl.get(tblId);
|
||||
table.unsetObjectCreated();
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addRefreshTable(tblId);
|
||||
} else {
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.catalog.external;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo;
|
||||
@ -89,12 +88,9 @@ public class HMSExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
protected synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
if (!objectCreated) {
|
||||
try {
|
||||
getRemoteTable();
|
||||
} catch (MetaNotFoundException e) {
|
||||
// CHECKSTYLE IGNORE THIS LINE
|
||||
}
|
||||
remoteTable = ((HMSExternalCatalog) catalog).getClient().getTable(dbName, name);
|
||||
if (remoteTable == null) {
|
||||
dlaType = DLAType.UNKNOWN;
|
||||
} else {
|
||||
@ -150,14 +146,8 @@ public class HMSExternalTable extends ExternalTable {
|
||||
/**
|
||||
* Get the related remote hive metastore table.
|
||||
*/
|
||||
public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() throws MetaNotFoundException {
|
||||
if (remoteTable == null) {
|
||||
synchronized (this) {
|
||||
if (remoteTable == null) {
|
||||
remoteTable = ((HMSExternalCatalog) catalog).getClient().getTable(dbName, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() {
|
||||
makeSureInitialized();
|
||||
return remoteTable;
|
||||
}
|
||||
|
||||
|
||||
@ -92,6 +92,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
|
||||
tblId = tableNameToId.get(tableName);
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
IcebergExternalTable table = idToTbl.get(tblId);
|
||||
table.unsetObjectCreated();
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addRefreshTable(tblId);
|
||||
} else {
|
||||
|
||||
@ -47,6 +47,7 @@ public class IcebergExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
protected synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
if (!objectCreated) {
|
||||
objectCreated = true;
|
||||
}
|
||||
|
||||
@ -71,6 +71,7 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> im
|
||||
tblId = tableNameToId.get(tableName);
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
JdbcExternalTable table = idToTbl.get(tblId);
|
||||
table.unsetObjectCreated();
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addRefreshTable(tblId);
|
||||
} else {
|
||||
|
||||
@ -49,6 +49,7 @@ public class JdbcExternalTable extends ExternalTable {
|
||||
|
||||
@Override
|
||||
protected synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
if (!objectCreated) {
|
||||
jdbcTable = toJdbcTable();
|
||||
objectCreated = true;
|
||||
|
||||
@ -64,6 +64,7 @@ public class TestExternalDatabase extends ExternalDatabase<TestExternalTable> im
|
||||
tblId = tableNameToId.get(tableName);
|
||||
tmpTableNameToId.put(tableName, tblId);
|
||||
TestExternalTable table = idToTbl.get(tblId);
|
||||
table.unsetObjectCreated();
|
||||
tmpIdToTbl.put(tblId, table);
|
||||
initDatabaseLog.addRefreshTable(tblId);
|
||||
} else {
|
||||
|
||||
@ -38,8 +38,9 @@ public class TestExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void makeSureInitialized() {
|
||||
|
||||
public synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
this.objectCreated = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -591,6 +591,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
if (table == null) {
|
||||
throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
|
||||
}
|
||||
if (table instanceof ExternalTable) {
|
||||
((ExternalTable) table).unsetObjectCreated();
|
||||
}
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName);
|
||||
ExternalObjectLog log = new ExternalObjectLog();
|
||||
log.setCatalogId(catalog.getId());
|
||||
@ -615,6 +618,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId());
|
||||
return;
|
||||
}
|
||||
table.unsetObjectCreated();
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
|
||||
}
|
||||
|
||||
@ -150,6 +150,10 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isInitialized() {
|
||||
return this.initialized;
|
||||
}
|
||||
|
||||
// init some local objects such as:
|
||||
// hms client, read properties from hive-site.xml, es client
|
||||
protected abstract void initLocalObjectsImpl();
|
||||
|
||||
@ -1005,8 +1005,6 @@ public class ShowExecutor {
|
||||
? new ShowResultSet(showStmt.getMetaData(), rows)
|
||||
: new ShowResultSet(ShowCreateTableStmt.getMaterializedViewMetaData(), rows);
|
||||
}
|
||||
} catch (MetaNotFoundException e) {
|
||||
throw new AnalysisException(e.getMessage());
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
|
||||
@ -0,0 +1,179 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.CreateCatalogStmt;
|
||||
import org.apache.doris.analysis.DropCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshDbStmt;
|
||||
import org.apache.doris.analysis.RefreshTableStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.catalog.external.TestExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.test.TestExternalCatalog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.DdlExecutor;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RefreshTableTest extends TestWithFeService {
|
||||
private static Env env;
|
||||
private ConnectContext rootCtx;
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
rootCtx = createDefaultCtx();
|
||||
env = Env.getCurrentEnv();
|
||||
// 1. create test catalog
|
||||
CreateCatalogStmt testCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt("create catalog test1 properties(\n"
|
||||
+ " \"type\" = \"test\",\n"
|
||||
+ " \"catalog_provider.class\" "
|
||||
+ "= \"org.apache.doris.catalog.RefreshTableTest$RefreshTableProvider\"\n"
|
||||
+ ");",
|
||||
rootCtx);
|
||||
env.getCatalogMgr().createCatalog(testCatalog);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterAll() throws Exception {
|
||||
super.runAfterAll();
|
||||
rootCtx.setThreadLocalInfo();
|
||||
DropCatalogStmt stmt = (DropCatalogStmt) parseAndAnalyzeStmt("drop catalog test1");
|
||||
env.getCatalogMgr().dropCatalog(stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshCatalog() throws Exception {
|
||||
CatalogIf test1 = env.getCatalogMgr().getCatalog("test1");
|
||||
TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get();
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
table.makeSureInitialized();
|
||||
Assertions.assertTrue(table.isObjectCreated());
|
||||
RefreshCatalogStmt refreshCatalogStmt = new RefreshCatalogStmt("test1", null);
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
Assertions.assertTrue(table.isObjectCreated());
|
||||
test1.getDbNullable("db1").getTables();
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
Assertions.assertFalse(((ExternalCatalog) test1).isInitialized());
|
||||
table.makeSureInitialized();
|
||||
Assertions.assertTrue(((ExternalCatalog) test1).isInitialized());
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshCatalogStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshDatabase() throws Exception {
|
||||
CatalogIf test1 = env.getCatalogMgr().getCatalog("test1");
|
||||
TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get();
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
table.makeSureInitialized();
|
||||
Assertions.assertTrue(table.isObjectCreated());
|
||||
RefreshDbStmt refreshDbStmt = new RefreshDbStmt("test1", "db1", null);
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshDbStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
Assertions.assertTrue(table.isObjectCreated());
|
||||
test1.getDbNullable("db1").getTables();
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshDbStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
Assertions.assertFalse(((ExternalDatabase) test1.getDbNullable("db1")).isInitialized());
|
||||
table.makeSureInitialized();
|
||||
Assertions.assertTrue(((ExternalDatabase) test1.getDbNullable("db1")).isInitialized());
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshDbStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshTable() throws Exception {
|
||||
CatalogIf test1 = env.getCatalogMgr().getCatalog("test1");
|
||||
TestExternalTable table = (TestExternalTable) test1.getDbNullable("db1").getTable("tbl11").get();
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
table.makeSureInitialized();
|
||||
Assertions.assertTrue(table.isObjectCreated());
|
||||
RefreshTableStmt refreshTableStmt = new RefreshTableStmt(new TableName("test1", "db1", "tbl11"));
|
||||
try {
|
||||
DdlExecutor.execute(Env.getCurrentEnv(), refreshTableStmt);
|
||||
} catch (Exception e) {
|
||||
// Do nothing
|
||||
}
|
||||
Assertions.assertFalse(table.isObjectCreated());
|
||||
}
|
||||
|
||||
public static class RefreshTableProvider implements TestExternalCatalog.TestCatalogProvider {
|
||||
public static final Map<String, Map<String, List<Column>>> MOCKED_META;
|
||||
|
||||
static {
|
||||
MOCKED_META = Maps.newHashMap();
|
||||
Map<String, List<Column>> tblSchemaMap1 = Maps.newHashMap();
|
||||
// db1
|
||||
tblSchemaMap1.put("tbl11", Lists.newArrayList(
|
||||
new Column("a11", PrimitiveType.BIGINT),
|
||||
new Column("a12", PrimitiveType.STRING),
|
||||
new Column("a13", PrimitiveType.FLOAT)));
|
||||
tblSchemaMap1.put("tbl12", Lists.newArrayList(
|
||||
new Column("b21", PrimitiveType.BIGINT),
|
||||
new Column("b22", PrimitiveType.STRING),
|
||||
new Column("b23", PrimitiveType.FLOAT)));
|
||||
MOCKED_META.put("db1", tblSchemaMap1);
|
||||
// db2
|
||||
Map<String, List<Column>> tblSchemaMap2 = Maps.newHashMap();
|
||||
tblSchemaMap2.put("tbl21", Lists.newArrayList(
|
||||
new Column("c11", PrimitiveType.BIGINT),
|
||||
new Column("c12", PrimitiveType.STRING),
|
||||
new Column("c13", PrimitiveType.FLOAT)));
|
||||
MOCKED_META.put("db2", tblSchemaMap2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<Column>>> getMetadata() {
|
||||
return MOCKED_META;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user