[fix](catalog) refactor the schema cache for external table (#34517) (#34599)

bp #34517
This commit is contained in:
Mingyu Chen
2024-05-09 18:02:18 +08:00
committed by GitHub
parent 3ae3f9d6e1
commit 53332eb4ba
20 changed files with 309 additions and 94 deletions

View File

@ -22,7 +22,6 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InfoSchemaDb;
@ -386,7 +385,7 @@ public abstract class ExternalCatalog
}
}
public final List<Column> getSchema(String dbName, String tblName) {
public final Optional<SchemaCacheValue> getSchema(String dbName, String tblName) {
makeSureInitialized();
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
if (db.isPresent()) {
@ -395,9 +394,7 @@ public abstract class ExternalCatalog
return table.get().initSchemaAndUpdateTime();
}
}
// return one column with unsupported type.
// not return empty to avoid some unexpected issue.
return Lists.newArrayList(Column.UNSUPPORTED_COLUMN);
return Optional.empty();
}
@Override
@ -507,7 +504,7 @@ public abstract class ExternalCatalog
}
if (useMetaCache.get()) {
return metaCache.getMetaObjById(dbId).get();
return metaCache.getMetaObjById(dbId).orElse(null);
} else {
return idToDb.get(dbId);
}

View File

@ -370,7 +370,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
public T getTableNullable(String tableName) {
makeSureInitialized();
if (extCatalog.getUseMetaCache().get()) {
return metaCache.getMetaObj(tableName).get();
return metaCache.getMetaObj(tableName).orElse(null);
} else {
if (!tableNameToId.containsKey(tableName)) {
return null;

View File

@ -31,8 +31,8 @@ import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -42,7 +42,7 @@ public class ExternalSchemaCache {
private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class);
private final ExternalCatalog catalog;
private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
private LoadingCache<SchemaCacheKey, Optional<SchemaCacheValue>> schemaCache;
public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) {
this.catalog = catalog;
@ -73,22 +73,22 @@ public class ExternalSchemaCache {
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge);
}
private ImmutableList<Column> loadSchema(SchemaCacheKey key) {
ImmutableList<Column> schema = ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName));
private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, key.tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("load schema for {} in catalog {}", key, catalog.getName());
}
return schema;
}
public List<Column> getSchema(String dbName, String tblName) {
public Optional<SchemaCacheValue> getSchemaValue(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
return schemaCache.get(key);
}
public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column> schema) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
schemaCache.put(key, schema);
schemaCache.put(key, Optional.of(new SchemaCacheValue(schema)));
}
public void invalidateTableCache(String dbName, String tblName) {

View File

@ -143,7 +143,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@Override
public List<Column> getFullSchema() {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchema(dbName, name);
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name);
return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
}
@Override
@ -161,7 +162,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
return getFullSchema();
}
@Override
public void setNewFullSchema(List<Column> newSchema) {
}
@ -301,12 +301,12 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
*
* @return
*/
public List<Column> initSchemaAndUpdateTime() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
schemaUpdateTime = System.currentTimeMillis();
return initSchema();
}
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
throw new NotImplementedException("implement in sub class");
}
@ -365,4 +365,9 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
protected Optional<SchemaCacheValue> getSchemaCacheValue() {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchemaValue(dbName, name);
}
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
import java.util.List;
/**
* The cache value of ExternalSchemaCache.
* Different external table type has different schema cache value.
* For example, Hive table has HMSSchemaCacheValue, Paimon table has PaimonSchemaCacheValue.
* All objects that should be refreshed along with schema should be put in this class.
*/
public class SchemaCacheValue {
protected List<Column> schema;
public SchemaCacheValue(List<Column> schema) {
this.schema = schema;
}
public List<Column> getSchema() {
return schema;
}
}

View File

@ -20,11 +20,13 @@ package org.apache.doris.datasource.es;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import java.util.List;
import java.util.Optional;
/**
* Elasticsearch external table.
@ -69,9 +71,11 @@ public class EsExternalTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient();
return EsUtil.genColumnsFromEs(restClient, name, null, ((EsExternalCatalog) catalog).enableMappingEsId());
return Optional.of(new SchemaCacheValue(
EsUtil.genColumnsFromEs(restClient, name, null,
((EsExternalCatalog) catalog).enableMappingEsId())));
}
private EsTable toEsTable() {

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
@ -159,7 +160,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private List<Column> partitionColumns;
private DLAType dlaType = DLAType.UNKNOWN;
@ -296,15 +296,17 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
public List<Type> getPartitionColumnTypes() {
makeSureInitialized();
getFullSchema();
return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList());
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes())
.orElse(Collections.emptyList());
}
@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
getFullSchema();
return partitionColumns;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns())
.orElse(Collections.emptyList());
}
public TableScanParams getScanParams() {
@ -532,7 +534,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public List<Column> initSchemaAndUpdateTime() {
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
// try to use transient_lastDdlTime from hms client
@ -554,7 +556,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
@ -564,8 +566,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
} else {
columns = getHiveSchema();
}
initPartitionColumns(columns);
return columns;
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
private List<Column> getIcebergSchema() {
@ -585,18 +587,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
private List<Column> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List<Column> columns;
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
List<Column> columns = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
String fieldName = field.getName().toLowerCase(Locale.ROOT);
String defaultValue = colDefaultValues.getOrDefault(fieldName, null);
tmpSchema.add(new Column(fieldName,
columns.add(new Column(fieldName,
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
columns = tmpSchema;
return columns;
}
@ -613,10 +613,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
return rowCount;
}
private void initPartitionColumns(List<Column> schema) {
private List<Column> initPartitionColumns(List<Column> schema) {
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
List<Column> partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
@ -636,6 +636,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
if (LOG.isDebugEnabled()) {
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}
return partitionColumns;
}
public boolean hasColumnStatistics(String colName) {

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.SchemaCacheValue;
import java.util.List;
import java.util.stream.Collectors;
public class HMSSchemaCacheValue extends SchemaCacheValue {
private List<Column> partitionColumns;
public HMSSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema);
this.partitionColumns = partitionColumns;
}
public List<Column> getPartitionColumns() {
return partitionColumns;
}
public List<Type> getPartitionColTypes() {
return partitionColumns.stream().map(Column::getType).collect(Collectors.toList());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@ -29,6 +30,7 @@ import org.apache.doris.thrift.TTableType;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@ -48,8 +50,8 @@ public class IcebergExternalTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
return IcebergUtils.getSchema(catalog, dbName, name);
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
}
@Override

View File

@ -18,16 +18,16 @@
package org.apache.doris.datasource.infoschema;
import org.apache.doris.analysis.SchemaTableType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TSchemaTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import java.util.List;
import java.util.Optional;
public class ExternalInfoSchemaTable extends ExternalTable {
@ -36,10 +36,9 @@ public class ExternalInfoSchemaTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns = SchemaTable.TABLE_MAP.get(name).getFullSchema();
return columns;
return Optional.of(new SchemaCacheValue(SchemaTable.TABLE_MAP.get(name).getFullSchema()));
}
@Override

View File

@ -18,16 +18,16 @@
package org.apache.doris.datasource.infoschema;
import org.apache.doris.analysis.SchemaTableType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MysqlDBTable;
import org.apache.doris.catalog.MysqlDb;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TSchemaTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import java.util.List;
import java.util.Optional;
public class ExternalMysqlTable extends ExternalTable {
public ExternalMysqlTable(long id, String name, ExternalCatalog catalog) {
@ -35,9 +35,9 @@ public class ExternalMysqlTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
return MysqlDBTable.TABLE_MAP.get(name).getFullSchema();
return Optional.of(new SchemaCacheValue(MysqlDBTable.TABLE_MAP.get(name).getFullSchema()));
}
@Override

View File

@ -20,6 +20,7 @@ package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
@ -29,6 +30,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
/**
* Elasticsearch external table.
@ -71,8 +73,9 @@ public class JdbcExternalTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
return ((JdbcExternalCatalog) catalog).getJdbcClient().getColumnsFromJdbc(dbName, name);
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient()
.getColumnsFromJdbc(dbName, name)));
}
private JdbcTable toJdbcTable() {

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
@ -43,11 +44,13 @@ import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -55,12 +58,6 @@ import java.util.stream.Collectors;
* MaxCompute external table.
*/
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
private List<String> partitionSpecs;
private Map<String, Column> partitionNameToColumns;
private List<Type> partitionTypes;
public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
}
@ -69,8 +66,6 @@ public class MaxComputeExternalTable extends ExternalTable {
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
initTablePartitions();
objectCreated = true;
}
}
@ -100,26 +95,37 @@ public class MaxComputeExternalTable extends ExternalTable {
@Override
public Set<String> getPartitionNames() {
makeSureInitialized();
return partitionNameToColumns.keySet();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColNames())
.orElse(Collections.emptySet());
}
public List<Column> getPartitionColumns() {
makeSureInitialized();
return new ArrayList<>(partitionNameToColumns.values());
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColumns())
.orElse(Collections.emptyList());
}
public TablePartitionValues getPartitionValues() {
makeSureInitialized();
// Make sure to call it after initSchema() completes
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return new TablePartitionValues();
}
Table odpsTable = ((MaxComputeSchemaCacheValue) schemaCacheValue.get()).getOdpsTable();
String projectName = odpsTable.getProject();
String tableName = odpsTable.getName();
MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMaxComputeMetadataCache(catalog.getId());
return metadataCache.getCachedPartitionValues(
new MaxComputeCacheKey(projectName, tableName), key -> loadPartitionValues(key));
new MaxComputeCacheKey(projectName, tableName),
key -> loadPartitionValues((MaxComputeSchemaCacheValue) schemaCacheValue.get()));
}
private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) {
private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue schemaCacheValue) {
List<String> partitionSpecs = schemaCacheValue.getPartitionSpecs();
List<Type> partitionTypes = schemaCacheValue.getPartitionTypes();
TablePartitionValues partitionValues = new TablePartitionValues();
partitionValues.addPartitions(partitionSpecs,
partitionSpecs.stream()
@ -154,21 +160,19 @@ public class MaxComputeExternalTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
public Optional<SchemaCacheValue> initSchema() {
// this method will be called at semantic parsing.
makeSureInitialized();
Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();
List<Column> result = Lists.newArrayListWithCapacity(columns.size());
List<Column> schema = Lists.newArrayListWithCapacity(columns.size());
for (com.aliyun.odps.Column field : columns) {
result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
schema.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
result.addAll(partitionNameToColumns.values());
return result;
}
private void initTablePartitions() {
List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
List<String> partitionSpecs;
if (!partitionColumns.isEmpty()) {
partitionSpecs = odpsTable.getPartitions().stream()
.map(e -> e.getPartitionSpec().toString(false, true))
@ -177,17 +181,21 @@ public class MaxComputeExternalTable extends ExternalTable {
partitionSpecs = ImmutableList.of();
}
// sort partition columns to align partitionTypes and partitionName.
partitionNameToColumns = new LinkedHashMap<>();
Map<String, Column> partitionNameToColumns = Maps.newHashMap();
for (com.aliyun.odps.Column partColumn : partitionColumns) {
Column dorisCol = new Column(partColumn.getName(),
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
true, partColumn.getComment(), true, -1);
partitionNameToColumns.put(dorisCol.getName(), dorisCol);
}
partitionTypes = partitionNameToColumns.values()
List<Type> partitionTypes = partitionNameToColumns.values()
.stream()
.map(Column::getType)
.collect(Collectors.toList());
schema.addAll(partitionNameToColumns.values());
return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, partitionSpecs, partitionNameToColumns,
partitionTypes));
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
@ -295,6 +303,8 @@ public class MaxComputeExternalTable extends ExternalTable {
public Table getOdpsTable() {
makeSureInitialized();
return odpsTable;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getOdpsTable())
.orElse(null);
}
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.maxcompute;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.SchemaCacheValue;
import com.aliyun.odps.Table;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Getter
@Setter
public class MaxComputeSchemaCacheValue extends SchemaCacheValue {
private Table odpsTable;
private List<String> partitionSpecs;
private Map<String, Column> partitionNameToColumns;
private List<Type> partitionTypes;
public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable, List<String> partitionSpecs,
Map<String, Column> partitionNameToColumns, List<Type> partitionTypes) {
super(schema);
this.odpsTable = odpsTable;
this.partitionSpecs = partitionSpecs;
this.partitionNameToColumns = partitionNameToColumns;
this.partitionTypes = partitionTypes;
}
public Set<String> getPartitionColNames() {
return partitionNameToColumns.keySet();
}
public List<Column> getPartitionColumns() {
return Lists.newArrayList(partitionNameToColumns.values());
}
}

View File

@ -49,14 +49,26 @@ public class MetaCache<T> {
RemovalListener<String, Optional<T>> removalListener) {
this.name = name;
CacheFactory cacheFactory = new CacheFactory(
// ATTN:
// The refreshAfterWriteSec is only used for metaObjCache, not for namesCache.
// Because namesCache need to be refreshed at interval so that user can get the latest meta list.
// But metaObjCache does not need to be refreshed at interval, because the object is actually not
// from remote datasource, it is just a local generated object to represent the meta info.
// So it only need to be expired after specified duration.
CacheFactory namesCacheFactory = new CacheFactory(
expireAfterWriteSec,
refreshAfterWriteSec,
maxSize,
true,
null);
namesCache = cacheFactory.buildCache(namesCacheLoader, null, executor);
metaObjCache = cacheFactory.buildCache(metaObjCacheLoader, removalListener, executor);
CacheFactory objCacheFactory = new CacheFactory(
expireAfterWriteSec,
OptionalLong.empty(),
maxSize,
true,
null);
namesCache = namesCacheFactory.buildCache(namesCacheLoader, null, executor);
metaObjCache = objCacheFactory.buildCache(metaObjCacheLoader, removalListener, executor);
}
public List<String> listNames() {

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@ -44,14 +45,13 @@ import org.apache.paimon.types.RowType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class PaimonExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);
private Table originTable = null;
public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
}
@ -63,23 +63,20 @@ public class PaimonExternalTable extends ExternalTable {
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
schemaUpdateTime = System.currentTimeMillis();
objectCreated = true;
}
}
public Table getOriginTable() {
public Table getPaimonTable() {
makeSureInitialized();
return originTable;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null);
}
@Override
public List<Column> initSchema() {
//init schema need update lastUpdateTime and get latest schema
objectCreated = false;
Table table = getOriginTable();
TableSchema schema = ((FileStoreTable) table).schema();
public Optional<SchemaCacheValue> initSchema() {
Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (DataField field : columns) {
@ -87,7 +84,7 @@ public class PaimonExternalTable extends ExternalTable {
paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
field.id()));
}
return tmpSchema;
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
}
private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
@ -180,7 +177,13 @@ public class PaimonExternalTable extends ExternalTable {
makeSureInitialized();
try {
long rowCount = 0;
List<Split> splits = originTable.newReadBuilder().newScan().plan().splits();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable())
.orElse(null);
if (paimonTable == null) {
return -1;
}
List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.paimon.table.Table;
import java.util.List;
public class PaimonSchemaCacheValue extends SchemaCacheValue {
private Table paimonTable;
public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
super(schema);
this.paimonTable = paimonTable;
}
public Table getPaimonTable() {
return paimonTable;
}
}

View File

@ -39,7 +39,7 @@ public class PaimonSource {
public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.paimonExtTable = table;
this.originTable = paimonExtTable.getOriginTable();
this.originTable = paimonExtTable.getPaimonTable();
this.desc = desc;
}

View File

@ -17,15 +17,15 @@
package org.apache.doris.datasource.test;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
/**
* TestExternalTable is a table for unit test.
@ -53,7 +53,7 @@ public class TestExternalTable extends ExternalTable {
}
@Override
public List<Column> initSchema() {
return ((TestExternalCatalog) catalog).mockedSchema(dbName, name);
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(((TestExternalCatalog) catalog).mockedSchema(dbName, name)));
}
}

View File

@ -101,8 +101,8 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
}
context.setStartTime();
int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
if (userQueryTimeout <= 0) {
LOG.warn("Connection set query timeout to {}",
if (userQueryTimeout <= 0 && LOG.isDebugEnabled()) {
LOG.debug("Connection set query timeout to {}",
context.getSessionVariable().getQueryTimeoutS());
}
context.setUserQueryTimeout(userQueryTimeout);