[fix](multi-catalog) fix recursive get schema cache bug (#16415)

This commit is contained in:
Mingyu Chen
2023-02-06 09:23:07 +08:00
committed by GitHub
parent d2044c2dc5
commit f940cf4cf6
13 changed files with 169 additions and 158 deletions

View File

@ -63,6 +63,9 @@ public class Column implements Writable, GsonPostProcessable {
private static final String COLUMN_ARRAY_CHILDREN = "item";
public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1;
public static final Column UNSUPPORTED_COLUMN = new Column("unknown",
Type.UNSUPPORTED, true, null, true, null, "invalid", true, null, -1, null);
@SerializedName(value = "name")
private String name;
@SerializedName(value = "type")

View File

@ -20,6 +20,8 @@ package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -76,6 +78,12 @@ public class EsExternalTable extends ExternalTable {
return tTableDescriptor;
}
@Override
public List<Column> initSchema() {
EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient();
return EsUtil.genColumnsFromEs(restClient, name, null, ((EsExternalCatalog) catalog).enableMappingEsId());
}
private EsTable toEsTable() {
List<Column> schema = getFullSchema();
EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;

View File

@ -310,6 +310,17 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
return 1;
}
/**
* Should only be called in ExternalCatalog's getSchema(),
* which is called from schema cache.
* If you want to get schema of this table, use getFullSchema()
*
* @return
*/
public List<Column> initSchema() {
throw new NotImplementedException("implement in sub class");
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -18,6 +18,7 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.HMSExternalCatalog;
@ -36,6 +37,8 @@ import com.google.common.collect.Sets;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -106,29 +109,10 @@ public class HMSExternalTable extends ExternalTable {
dlaType = DLAType.UNKNOWN;
}
}
initPartitionColumns();
objectCreated = true;
}
}
private void initPartitionColumns() {
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
List<Column> schema = getFullSchema();
for (Column column : schema) {
if (partitionKey.equals(column.getName())) {
partitionColumns.add(column);
break;
}
}
}
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}
/**
* Now we only support cow table in iceberg.
*/
@ -180,18 +164,16 @@ public class HMSExternalTable extends ExternalTable {
public List<Type> getPartitionColumnTypes() {
makeSureInitialized();
getFullSchema();
return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList());
}
public List<Column> getPartitionColumns() {
makeSureInitialized();
getFullSchema();
return partitionColumns;
}
public List<String> getPartitionColumnNames() {
return getPartitionColumns().stream().map(c -> c.getName()).collect(Collectors.toList());
}
@Override
public boolean isView() {
makeSureInitialized();
@ -309,5 +291,54 @@ public class HMSExternalTable extends ExternalTable {
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
return client.getPartition(dbName, name, partitionValues);
}
@Override
public List<Column> initSchema() {
List<Column> columns;
List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name);
if (dlaType.equals(DLAType.ICEBERG)) {
columns = getIcebergSchema(schema);
} else {
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
columns = tmpSchema;
}
initPartitionColumns(columns);
return columns;
}
private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, null, field.getComment(), true, null,
schema.caseInsensitiveFindField(field.getName()).fieldId(), null));
}
return tmpSchema;
}
private void initPartitionColumns(List<Column> schema) {
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
if (partitionKey.equals(column.getName())) {
partitionColumns.add(column);
break;
}
}
}
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}
}

View File

@ -17,13 +17,19 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.List;
@ -43,6 +49,68 @@ public class IcebergExternalTable extends ExternalTable {
}
}
@Override
public List<Column> initSchema() {
List<Types.NestedField> columns = ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name).schema()
.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
icebergTypeToDorisType(field.type()), true, null,
true, field.doc(), true, -1));
}
return tmpSchema;
}
private Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return Type.BOOLEAN;
case INTEGER:
return Type.INT;
case LONG:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case STRING:
case BINARY:
case UUID:
return Type.STRING;
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;
return ScalarType.createCharType(fixed.length());
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return ScalarType.createDecimalType(decimal.precision(), decimal.scale());
case DATE:
return ScalarType.createDateV2Type();
case TIMESTAMP:
return ScalarType.createDatetimeV2Type(0);
case TIME:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
}
}
protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
if (type.isPrimitiveType()) {
return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
}
switch (type.typeId()) {
case LIST:
Types.ListType list = (Types.ListType) type;
return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
case MAP:
case STRUCT:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + type);
}
}
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();

View File

@ -70,6 +70,11 @@ public class JdbcExternalTable extends ExternalTable {
return jdbcTable.toThrift();
}
@Override
public List<Column> initSchema() {
return ((JdbcExternalCatalog) catalog).getJdbcClient().getColumnsFromJdbc(dbName, name);
}
private JdbcTable toJdbcTable() {
List<Column> schema = getFullSchema();
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
@ -85,5 +90,4 @@ public class JdbcExternalTable extends ExternalTable {
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
return jdbcTable;
}
}

View File

@ -17,12 +17,10 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -156,10 +154,4 @@ public class EsExternalCatalog extends ExternalCatalog {
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
}
@Override
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
return EsUtil.genColumnsFromEs(getEsRestClient(), tblName, null, enableMappingEsId());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
@ -46,6 +47,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* The abstract class for all types of external catalogs.
@ -165,7 +167,19 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
return idToDb.get(dbId);
}
public abstract List<Column> getSchema(String dbName, String tblName);
public final List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
Optional<ExternalDatabase> db = getDb(dbName);
if (db.isPresent()) {
Optional table = db.get().getTable(tblName);
if (table.isPresent()) {
return ((ExternalTable) table.get()).initSchema();
}
}
// return one column with unsupported type.
// not return empty to avoid some unexpected issue.
return Lists.newArrayList(Column.UNSUPPORTED_COLUMN);
}
@Override
public long getId() {

View File

@ -18,14 +18,11 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@ -35,18 +32,14 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* External catalog for hive metastore compatible data sources.
@ -174,42 +167,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
return client;
}
@Override
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
Optional<ExternalDatabase> db = getDb(dbName);
if (db.isPresent()) {
Optional table = db.get().getTable(tblName);
if (table.isPresent()) {
HMSExternalTable hmsTable = (HMSExternalTable) table.get();
if (hmsTable.getDlaType().equals(HMSExternalTable.DLAType.ICEBERG)) {
return getIcebergSchema(hmsTable, schema);
}
}
}
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
return tmpSchema;
}
private List<Column> getIcebergSchema(HMSExternalTable table, List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table);
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true,
schema.caseInsensitiveFindField(field.getName()).fieldId()));
}
return tmpSchema;
}
public void setLastSyncedEventId(long lastSyncedEventId) {
this.lastSyncedEventId = lastSyncedEventId;
}

View File

@ -17,7 +17,6 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.external.ExternalDatabase;
@ -163,10 +162,4 @@ public class JdbcExternalCatalog extends ExternalCatalog {
makeSureInitialized();
return jdbcClient.isTableExist(dbName, tblName);
}
@Override
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
return jdbcClient.getColumnsFromJdbc(dbName, tblName);
}
}

View File

@ -17,11 +17,7 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.common.AnalysisException;
@ -31,16 +27,13 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -103,55 +96,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
return conf;
}
protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
if (type.isPrimitiveType()) {
return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
}
switch (type.typeId()) {
case LIST:
Types.ListType list = (Types.ListType) type;
return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
case MAP:
case STRUCT:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + type);
}
}
private Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return Type.BOOLEAN;
case INTEGER:
return Type.INT;
case LONG:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case STRING:
case BINARY:
case UUID:
return Type.STRING;
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;
return ScalarType.createCharType(fixed.length());
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return ScalarType.createDecimalType(decimal.precision(), decimal.scale());
case DATE:
return ScalarType.createDateV2Type();
case TIMESTAMP:
return ScalarType.createDatetimeV2Type(0);
case TIME:
return Type.UNSUPPORTED;
default:
throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
}
}
public Catalog getCatalog() {
makeSureInitialized();
return catalog;
@ -188,20 +132,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
return new ArrayList<>(dbNameToId.keySet());
}
@Override
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
Schema schema = getIcebergTable(dbName, tblName).schema();
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
icebergTypeToDorisType(field.type()), true, null,
true, field.doc(), true, schema.caseInsensitiveFindField(field.name()).fieldId()));
}
return tmpSchema;
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();

View File

@ -1925,8 +1925,8 @@ gin-gonic/gin 3246
flutter/flutter 3160
facebook/rocksdb 3156
antirez/redis 3101
istio/istio 3094
ansible/ansible 3094
istio/istio 3094
torvalds/linux 3092
minio/minio 3068
cockroachdb/cockroach 3006
@ -1958,8 +1958,8 @@ tensorflow/models 2355
getsentry/sentry 2354
dgraph-io/dgraph 2338
TheAlgorithms/Python 2320
mholt/caddy 2317
astaxie/build-web-application-with-golang 2317
mholt/caddy 2317
-- !46 --
facebook/react 121976
@ -4327,8 +4327,8 @@ gin-gonic/gin 3246
flutter/flutter 3160
facebook/rocksdb 3156
antirez/redis 3101
istio/istio 3094
ansible/ansible 3094
istio/istio 3094
torvalds/linux 3092
minio/minio 3068
cockroachdb/cockroach 3006
@ -4360,8 +4360,8 @@ tensorflow/models 2355
getsentry/sentry 2354
dgraph-io/dgraph 2338
TheAlgorithms/Python 2320
mholt/caddy 2317
astaxie/build-web-application-with-golang 2317
mholt/caddy 2317
-- !46 --
facebook/react 121976

View File

@ -431,7 +431,7 @@ suite("test_external_github", "p2") {
WHERE (event_type = 'WatchEvent') AND (repo_name IN ('ClickHouse/ClickHouse', 'yandex/ClickHouse'))
)) AND (repo_name NOT IN ('ClickHouse/ClickHouse', 'yandex/ClickHouse'))
GROUP BY repo_name
ORDER BY stars DESC
ORDER BY stars DESC, repo_name asc
LIMIT 50"""
def starsFromHeavyGithubUsers1 = """SELECT /*+SET_VAR(exec_mem_limit=21474836480, query_timeout=600) */
repo_name,