branch-2.1: [fix](catalog) synchronize reset methods in catalog classes and remove Lombok annotations (#53168)
pick (#51787)
This commit is contained in:
@ -71,7 +71,6 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -98,7 +97,6 @@ import java.util.stream.Collectors;
|
||||
/**
|
||||
* The abstract class for all types of external catalogs.
|
||||
*/
|
||||
@Data
|
||||
public abstract class ExternalCatalog
|
||||
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
|
||||
@ -337,7 +335,7 @@ public abstract class ExternalCatalog
|
||||
// check if all required properties are set when creating catalog
|
||||
public void checkProperties() throws DdlException {
|
||||
// check refresh parameter of catalog
|
||||
Map<String, String> properties = getCatalogProperty().getProperties();
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) {
|
||||
try {
|
||||
Integer metadataRefreshIntervalSec = Integer.valueOf(
|
||||
@ -368,7 +366,7 @@ public abstract class ExternalCatalog
|
||||
* isDryRun: if true, it will try to create the custom access controller, but will not add it to the access manager.
|
||||
*/
|
||||
public void initAccessController(boolean isDryRun) {
|
||||
Map<String, String> properties = getCatalogProperty().getProperties();
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
// 1. get access controller class name
|
||||
String className = properties.getOrDefault(CatalogMgr.ACCESS_CONTROLLER_CLASS_PROP, "");
|
||||
if (Strings.isNullOrEmpty(className)) {
|
||||
@ -534,7 +532,7 @@ public abstract class ExternalCatalog
|
||||
* @param invalidCache if {@code true}, the catalog cache will be invalidated
|
||||
* and reloaded during the refresh process.
|
||||
*/
|
||||
public void resetToUninitialized(boolean invalidCache) {
|
||||
public synchronized void resetToUninitialized(boolean invalidCache) {
|
||||
this.objectCreated = false;
|
||||
this.initialized = false;
|
||||
synchronized (this.propLock) {
|
||||
@ -986,6 +984,14 @@ public abstract class ExternalCatalog
|
||||
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the initialized status for testing purposes only.
|
||||
* This method should only be used in test cases.
|
||||
*/
|
||||
public void setInitializedForTest(boolean initialized) {
|
||||
this.initialized = initialized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDb(CreateDbStmt stmt) throws DdlException {
|
||||
makeSureInitialized();
|
||||
@ -1182,4 +1188,24 @@ public abstract class ExternalCatalog
|
||||
public ThreadPoolExecutor getThreadPoolExecutor() {
|
||||
return threadPoolWithPreAuth;
|
||||
}
|
||||
|
||||
public CatalogProperty getCatalogProperty() {
|
||||
return catalogProperty;
|
||||
}
|
||||
|
||||
public Optional<Boolean> getUseMetaCache() {
|
||||
return useMetaCache;
|
||||
}
|
||||
|
||||
public Map<Pair<String, String>, String> getTableAutoAnalyzePolicy() {
|
||||
return tableAutoAnalyzePolicy;
|
||||
}
|
||||
|
||||
public TransactionManager getTransactionManager() {
|
||||
return transactionManager;
|
||||
}
|
||||
|
||||
public ThreadPoolExecutor getThreadPoolWithPreAuth() {
|
||||
return threadPoolWithPreAuth;
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,7 +132,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
}
|
||||
}
|
||||
|
||||
public void setUnInitialized(boolean invalidCache) {
|
||||
public synchronized void setUnInitialized(boolean invalidCache) {
|
||||
this.initialized = false;
|
||||
this.invalidCacheInInit = invalidCache;
|
||||
if (extCatalog.getUseMetaCache().isPresent()) {
|
||||
|
||||
@ -45,7 +45,6 @@ import org.apache.doris.thrift.TTableDescriptor;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -65,7 +64,6 @@ import java.util.stream.Collectors;
|
||||
* External table represent tables that are not self-managed by Doris.
|
||||
* Such as tables from hive, iceberg, es, etc.
|
||||
*/
|
||||
@Getter
|
||||
public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(ExternalTable.class);
|
||||
|
||||
@ -479,4 +477,36 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(name, db);
|
||||
}
|
||||
|
||||
public long getSchemaUpdateTime() {
|
||||
return schemaUpdateTime;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public boolean isObjectCreated() {
|
||||
return objectCreated;
|
||||
}
|
||||
|
||||
public ExternalCatalog getCatalog() {
|
||||
return catalog;
|
||||
}
|
||||
|
||||
public ExternalDatabase getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public TableAttributes getTableAttributes() {
|
||||
return tableAttributes;
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,7 +213,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetToUninitialized(boolean invalidCache) {
|
||||
public synchronized void resetToUninitialized(boolean invalidCache) {
|
||||
super.resetToUninitialized(invalidCache);
|
||||
if (metadataOps != null) {
|
||||
metadataOps.close();
|
||||
|
||||
@ -49,7 +49,6 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -62,7 +61,6 @@ import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
@Getter
|
||||
public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcExternalCatalog.class);
|
||||
|
||||
@ -124,7 +122,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetToUninitialized(boolean invalidCache) {
|
||||
public synchronized void resetToUninitialized(boolean invalidCache) {
|
||||
super.resetToUninitialized(invalidCache);
|
||||
this.identifierMapping = new JdbcIdentifierMapping(
|
||||
(Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()),
|
||||
@ -297,6 +295,10 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName);
|
||||
}
|
||||
|
||||
public IdentifierMapping getIdentifierMapping() {
|
||||
return identifierMapping;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkWhenCreating() throws DdlException {
|
||||
super.checkWhenCreating();
|
||||
|
||||
@ -72,7 +72,7 @@ public class CreateIcebergTableTest {
|
||||
if (icebergCatalog.getUseMetaCache().get()) {
|
||||
icebergCatalog.makeSureInitialized();
|
||||
} else {
|
||||
icebergCatalog.setInitialized(true);
|
||||
icebergCatalog.setInitializedForTest(true);
|
||||
}
|
||||
|
||||
// create db
|
||||
@ -82,7 +82,7 @@ public class CreateIcebergTableTest {
|
||||
if (icebergCatalog.getUseMetaCache().get()) {
|
||||
icebergCatalog.makeSureInitialized();
|
||||
} else {
|
||||
icebergCatalog.setInitialized(true);
|
||||
icebergCatalog.setInitializedForTest(true);
|
||||
}
|
||||
IcebergExternalDatabase db = new IcebergExternalDatabase(icebergCatalog, 1L, dbName, dbName);
|
||||
icebergCatalog.addDatabaseForTest(db);
|
||||
|
||||
@ -31,7 +31,7 @@ public class PaimonExternalCatalogTest {
|
||||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put("warehouse", "not_exist");
|
||||
PaimonExternalCatalog catalog = new PaimonFileExternalCatalog(1, "name", "resource", props, "comment");
|
||||
catalog.setInitialized(true);
|
||||
catalog.setInitializedForTest(true);
|
||||
|
||||
try {
|
||||
catalog.getPaimonTable("dbName", "tblName");
|
||||
|
||||
@ -88,7 +88,7 @@ public class HiveTableSinkTest {
|
||||
mockDifferLocationTable(location);
|
||||
|
||||
HMSExternalCatalog hmsExternalCatalog = new HMSExternalCatalog();
|
||||
hmsExternalCatalog.setInitialized(true);
|
||||
hmsExternalCatalog.setInitializedForTest(true);
|
||||
HMSExternalDatabase db = new HMSExternalDatabase(hmsExternalCatalog, 10000, "hive_db1", "hive_db1");
|
||||
HMSExternalTable tbl = new HMSExternalTable(10001, "hive_tbl1", "hive_db1", hmsExternalCatalog, db);
|
||||
HiveTableSink hiveTableSink = new HiveTableSink(tbl);
|
||||
|
||||
Reference in New Issue
Block a user