From 7182f1464518b868c92bb37c19455242b17b171e Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 17 Nov 2022 08:30:03 +0800 Subject: [PATCH] [improvement][fix](multi-catalog) speed up list partition prune (#14268) In previous implementation, when doing list partition prune, we need to generation `rangeToId` every time we doing prune. But `rangeToId` is actually a static data that should be create-once-use-every-where. So for hive partition, I created the `rangeToId` and all other necessary data structures for partition prunning in partition cache, so that we can use it directly. In my test, the cost of partition prune for 10000 partitions reduce from 8s -> 0.2s. Aslo add "partition" info in explain string for hive table. ``` | 0:VEXTERNAL_FILE_SCAN_NODE | | predicates: `nation` = '0024c95b' | | inputSplitNum=1, totalFileSize=4750, scanRanges=1 | | partition=1/10000 | | numNodes=1 | | limit: 10 | ``` Bug fix: 1. Fix bug that es scan node can not filter data 2. Fix bug that query es with predicate like `where substring(test2,2) = "ext2";` will fail at planner phase. `Unexpected exception: org.apache.doris.analysis.FunctionCallExpr cannot be cast to org.apache.doris.analysis.SlotRef` TODO: 1. Some problem when quering es version 8: ` Unexpected exception: Index: 0, Size: 0`, will be fixed later. --- be/src/vec/exec/scan/new_es_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_es_scanner.cpp | 6 +- be/src/vec/exec/scan/new_es_scanner.h | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.h | 4 +- .../thirdparties/start-thirdparties-docker.sh | 2 +- .../catalog/external/EsExternalTable.java | 2 +- .../catalog/external/ExternalDatabase.java | 2 +- .../doris/catalog/external/ExternalTable.java | 3 +- .../catalog/external/HMSExternalTable.java | 48 +++++------ .../doris/datasource/ExternalCatalog.java | 2 +- .../datasource/hive/HiveMetaStoreCache.java | 55 ++++++++++-- .../doris/external/elasticsearch/EsUtil.java | 6 +- .../doris/planner/ListPartitionPrunerV2.java | 83 ++++++++++++++----- .../doris/planner/PartitionPrunerV2Base.java | 49 +++++++---- .../doris/planner/RangePartitionPrunerV2.java | 12 ++- .../external/ExternalFileScanNode.java | 8 ++ .../planner/external/HiveScanProvider.java | 44 ++++++---- .../doris/qe/MasterCatalogExecutor.java | 5 +- .../doris/service/FrontendServiceImpl.java | 32 +------ regression-test/data/es_p0/test_es_query.out | 25 +++++- .../suites/es_p0/test_es_query.groovy | 19 +++-- 22 files changed, 263 insertions(+), 150 deletions(-) diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index dcad57a418..6a57e917b4 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -205,7 +205,7 @@ Status NewEsScanNode::_init_scanners(std::list* scanners) { properties, _docvalue_context, doc_value_mode); _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare(_state)); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast(scanner)); } return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index cfde06ab3c..358a226d1e 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -41,8 +41,12 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l _docvalue_context(docvalue_context), _doc_value_mode(doc_value_mode) {} -Status NewEsScanner::prepare(RuntimeState* state) { +Status NewEsScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } if (_is_init) { return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index be4d50448b..4e82d72af9 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -36,7 +36,7 @@ public: Status close(RuntimeState* state) override; public: - Status prepare(RuntimeState* state); + Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 066074f16a..b91300bf47 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -19,7 +19,7 @@ namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, - TupleId tuple_id, std::string query_string) + const TupleId& tuple_id, const std::string& query_string) : VScanner(state, static_cast(parent), limit), _is_init(false), _jdbc_eos(false), diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 6a45462db1..24c2649dfe 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -25,8 +25,8 @@ namespace doris { namespace vectorized { class NewJdbcScanner : public VScanner { public: - NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, TupleId tuple_id, - std::string query_string); + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, + const TupleId& tuple_id, const std::string& query_string); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/docker/thirdparties/start-thirdparties-docker.sh b/docker/thirdparties/start-thirdparties-docker.sh index 8f1a577d9b..986d04da7a 100755 --- a/docker/thirdparties/start-thirdparties-docker.sh +++ b/docker/thirdparties/start-thirdparties-docker.sh @@ -31,7 +31,7 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" CONTAINER_UID="doris--" # elasticsearch -sed -i "" "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml +sed -i "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml sudo docker compose -f "${ROOT}"/docker-compose/elasticsearch/es.yaml --env-file "${ROOT}"/docker-compose/elasticsearch/es.env down sudo mkdir -p "${ROOT}"/docker-compose/elasticsearch/data/es6/ sudo rm -rf "${ROOT}"/docker-compose/elasticsearch/data/es6/* diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index eb8c8972b1..e2efd6aae1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -49,7 +49,7 @@ public class EsExternalTable extends ExternalTable { super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE); } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { esTable = toEsTable(); objectCreated = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 87d1cccf5d..ffbbda4a46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -107,7 +107,7 @@ public class ExternalDatabase implements DatabaseIf, // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(extCatalog.getId(), id, -1); + remoteExecutor.forward(extCatalog.getId(), id); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init external db %s operation to master", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 36189c9a00..e97c5b1a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -100,7 +100,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return false; } - public void makeSureInitialized() { + protected void makeSureInitialized() { throw new NotImplementedException(); } @@ -213,7 +213,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public List getFullSchema() { - makeSureInitialized(); ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchema(dbName, name); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 6e12f7e30a..168cf31a65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -78,7 +78,7 @@ public class HMSExternalTable extends ExternalTable { return dlaType != DLAType.UNKNOWN; } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { try { getRemoteTable(); @@ -98,10 +98,29 @@ public class HMSExternalTable extends ExternalTable { dlaType = DLAType.UNKNOWN; } } + + initPartitionColumns(); objectCreated = true; } } + private void initPartitionColumns() { + Set partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toSet()); + partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + for (String partitionKey : partitionKeys) { + // Do not use "getColumn()", which will cause dead loop + List schema = getFullSchema(); + for (Column column : schema) { + if (partitionKey.equals(column.getName())) { + partitionColumns.add(column); + break; + } + } + } + LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); + } + /** * Now we only support cow table in iceberg. */ @@ -161,13 +180,11 @@ public class HMSExternalTable extends ExternalTable { public List getPartitionColumnTypes() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); } public List getPartitionColumns() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns; } @@ -268,30 +285,5 @@ public class HMSExternalTable extends ExternalTable { public Map getS3Properties() { return catalog.getCatalogProperty().getS3Properties(); } - - private void initPartitionColumns() { - if (partitionColumns != null) { - return; - } - synchronized (this) { - if (partitionColumns != null) { - return; - } - Set partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) - .collect(Collectors.toSet()); - partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); - for (String partitionKey : partitionKeys) { - // Do not use "getColumn()", which will cause dead loop - List schema = getFullSchema(); - for (Column column : schema) { - if (partitionKey.equals(column.getName())) { - partitionColumns.add(column); - break; - } - } - } - LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 0663fed2e1..385c85fadf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -102,7 +102,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(id, -1, -1); + remoteExecutor.forward(id, -1); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init catalog %s operation to master.", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 388c847559..ddcc12d5fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.PartitionValue; import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -31,6 +32,9 @@ import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -38,6 +42,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -70,7 +77,7 @@ public class HiveMetaStoreCache { private HMSExternalCatalog catalog; // cache from -> - private LoadingCache> partitionValuesCache; + private LoadingCache partitionValuesCache; // cache from -> private LoadingCache partitionCache; // cache from -> @@ -86,9 +93,9 @@ public class HiveMetaStoreCache { partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(CacheLoader.asyncReloading( - new CacheLoader>() { + new CacheLoader() { @Override - public ImmutableList load(PartitionValueCacheKey key) throws Exception { + public HivePartitionValues load(PartitionValueCacheKey key) throws Exception { return loadPartitionValues(key); } }, executor)); @@ -148,17 +155,31 @@ public class HiveMetaStoreCache { MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge); } - private ImmutableList loadPartitionValues(PartitionValueCacheKey key) { + private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) { // partition name format: nation=cn/city=beijing List partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName); if (LOG.isDebugEnabled()) { LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName()); } - List partitionValues = Lists.newArrayListWithExpectedSize(partitionNames.size()); + Map idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size()); + long idx = 0; for (String partitionName : partitionNames) { - partitionValues.add(toListPartitionItem(partitionName, key.types)); + idToPartitionItem.put(idx++, toListPartitionItem(partitionName, key.types)); } - return ImmutableList.copyOf(partitionValues); + + Map> uidToPartitionRange = null; + Map, UniqueId> rangeToId = null; + RangeMap singleColumnRangeMap = null; + if (key.types.size() > 1) { + // uidToPartitionRange and rangeToId are only used for multi-column partition + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem); + rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + } else { + Preconditions.checkState(key.types.size() == 1, key.types); + // singleColumnRangeMap is only used for single-column partition + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem); + } + return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap); } private ListPartitionItem toListPartitionItem(String partitionName, List types) { @@ -248,7 +269,7 @@ public class HiveMetaStoreCache { return configuration; } - public ImmutableList getPartitionValues(String dbName, String tblName, List types) { + public HivePartitionValues getPartitionValues(String dbName, String tblName, List types) { PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types); try { return partitionValuesCache.get(key); @@ -420,4 +441,22 @@ public class HiveMetaStoreCache { return "FileCacheKey{" + "location='" + location + '\'' + ", inputFormat='" + inputFormat + '\'' + '}'; } } + + @Data + public static class HivePartitionValues { + private Map idToPartitionItem = Maps.newHashMap(); + private Map> uidToPartitionRange; + private Map, UniqueId> rangeToId; + private RangeMap singleColumnRangeMap; + + public HivePartitionValues(Map idToPartitionItem, + Map> uidToPartitionRange, + Map, UniqueId> rangeToId, + RangeMap singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + this.singleColumnRangeMap = singleColumnRangeMap; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index b8dd5f8d84..8a8055f72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -274,8 +274,11 @@ public class EsUtil { notPushDownList.add(expr); return null; } - } else { + } else if (leftExpr instanceof SlotRef) { column = ((SlotRef) leftExpr).getColumnName(); + } else { + notPushDownList.add(expr); + return null; } // Replace col with col.keyword if mapping exist. column = fieldsContext.getOrDefault(column, column); @@ -448,3 +451,4 @@ public class EsUtil { } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java index 6d45efc8dd..bbefea50e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java @@ -24,11 +24,14 @@ import org.apache.doris.common.AnalysisException; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.Collections; @@ -40,33 +43,62 @@ import java.util.stream.Collectors; /** * ListPartitionPrunerV2 + * * @since 1.0 */ @SuppressWarnings("UnstableApiUsage") public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { - private final Map> uidToPartitionRange; + private static final Logger LOG = LogManager.getLogger(ListPartitionPrunerV2.class); + // `uidToPartitionRange` is only used for multiple columns partition. + private Map> uidToPartitionRange; + private Map, UniqueId> rangeToId; public ListPartitionPrunerV2(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); this.uidToPartitionRange = Maps.newHashMap(); if (partitionColumns.size() > 1) { - // `uidToPartitionRange` is only used for multiple columns partition. - idToPartitionItem.forEach((id, item) -> { - List keys = item.getItems(); - List> ranges = keys.stream() - .map(key -> Range.closed(key, key)) - .collect(Collectors.toList()); - for (int i = 0; i < ranges.size(); i++) { - uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); - } - }); + this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem); + this.rangeToId = genRangeToId(uidToPartitionRange); } } + // Pass uidToPartitionRange and rangeToId from outside + public ListPartitionPrunerV2(Map idToPartitionItem, + List partitionColumns, + Map columnNameToRange, + Map> uidToPartitionRange, + Map, UniqueId> rangeToId, + RangeMap singleColumnRangeMap) { + super(idToPartitionItem, partitionColumns, columnNameToRange, singleColumnRangeMap); + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + } + + public static Map> genUidToPartitionRange( + Map idToPartitionItem) { + Map> uidToPartitionRange = Maps.newHashMap(); + idToPartitionItem.forEach((id, item) -> { + List keys = item.getItems(); + List> ranges = keys.stream() + .map(key -> Range.closed(key, key)) + .collect(Collectors.toList()); + for (int i = 0; i < ranges.size(); i++) { + uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); + } + }); + return uidToPartitionRange; + } + @Override - RangeMap getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap genSingleColumnRangeMap(Map idToPartitionItem) { RangeMap candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { List keys = item.getItems(); @@ -75,7 +107,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { .collect(Collectors.toList()); for (int i = 0; i < ranges.size(); i++) { candidate.put(mapPartitionKeyRange(ranges.get(i), 0), - new ListPartitionUniqueId(id, i)); + new ListPartitionUniqueId(id, i)); } }); return candidate; @@ -86,7 +118,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { */ @Override FinalFilters getFinalFilters(ColumnRange columnRange, - Column column) throws AnalysisException { + Column column) throws AnalysisException { if (!columnRange.hasFilter()) { return FinalFilters.noFilters(); } @@ -107,19 +139,26 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { @Override Collection pruneMultipleColumnPartition( Map columnToFilters) throws AnalysisException { - Map, UniqueId> rangeToId = Maps.newHashMap(); - uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid)); + Preconditions.checkNotNull(uidToPartitionRange); + Preconditions.checkNotNull(rangeToId); return doPruneMultiple(columnToFilters, rangeToId, 0); } + public static Map, UniqueId> genRangeToId( + Map> uidToPartitionRange) { + Map, UniqueId> rangeToId = Maps.newHashMap(); + uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid)); + return rangeToId; + } + private Collection doPruneMultiple(Map columnToFilters, - Map, UniqueId> partitionRangeToUid, - int columnIdx) { + Map, UniqueId> partitionRangeToUid, + int columnIdx) { // No more partition column. if (columnIdx == partitionColumns.size()) { return partitionRangeToUid.values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); } FinalFilters finalFilters = columnToFilters.get(partitionColumns.get(columnIdx)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java index d2b169d3de..e1772509ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; +import com.google.common.base.Preconditions; import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,15 +41,28 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { protected final Map idToPartitionItem; protected final List partitionColumns; protected final Map columnNameToRange; + // used for single column partition + protected RangeMap singleColumnRangeMap = null; public PartitionPrunerV2Base(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { this.idToPartitionItem = idToPartitionItem; this.partitionColumns = partitionColumns; this.columnNameToRange = columnNameToRange; } + // pass singleColumnRangeMap from outside + public PartitionPrunerV2Base(Map idToPartitionItem, + List partitionColumns, + Map columnNameToRange, + RangeMap singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.partitionColumns = partitionColumns; + this.columnNameToRange = columnNameToRange; + this.singleColumnRangeMap = singleColumnRangeMap; + } + @Override public Collection prune() throws AnalysisException { Map columnToFilters = Maps.newHashMap(); @@ -70,7 +84,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - abstract RangeMap getCandidateRangeMap(); + abstract void genSingleColumnRangeMap(); /** * Handle conjunctive and disjunctive `is null` predicates. @@ -107,29 +121,30 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { case CONSTANT_FALSE_FILTERS: return Collections.emptyList(); case HAVE_FILTERS: - RangeMap candidate = getCandidateRangeMap(); + genSingleColumnRangeMap(); + Preconditions.checkNotNull(singleColumnRangeMap); return finalFilters.filters.stream() - .map(filter -> { - RangeMap filtered = candidate.subRangeMap(filter); - return filtered.asMapOfRanges().values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); - }) - .flatMap(Set::stream) - .collect(Collectors.toSet()); + .map(filter -> { + RangeMap filtered = singleColumnRangeMap.subRangeMap(filter); + return filtered.asMapOfRanges().values().stream() + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); + }) + .flatMap(Set::stream) + .collect(Collectors.toSet()); case NO_FILTERS: default: return idToPartitionItem.keySet(); } } - protected Range mapPartitionKeyRange(Range fromRange, - int columnIdx) { + protected static Range mapPartitionKeyRange(Range fromRange, + int columnIdx) { return mapRange(fromRange, - partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); + partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); } - protected Range mapRange( + private static Range mapRange( Range range, Function mapper) { TO lower = range.hasLowerBound() ? mapper.apply(range.lowerEndpoint()) : null; TO upper = range.hasUpperBound() ? mapper.apply(range.upperEndpoint()) : null; @@ -157,7 +172,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - protected interface UniqueId { + public interface UniqueId { long getPartitionId(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java index 256c5c11f4..4aa3ee41a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java @@ -43,13 +43,19 @@ import java.util.Set; public class RangePartitionPrunerV2 extends PartitionPrunerV2Base { public RangePartitionPrunerV2(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); } @Override - RangeMap getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap genSingleColumnRangeMap(Map idToPartitionItem) { RangeMap candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { Range range = item.getItems(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 0cf6661846..0dd425d917 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -121,6 +121,8 @@ public class ExternalFileScanNode extends ExternalScanNode { // For explain private long inputSplitsNum = 0; private long totalFileSize = 0; + private long totalPartitionNum = 0; + private long readPartitionNum = 0; /** * External file scan node for: @@ -302,6 +304,10 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + if (scanProvider instanceof HiveScanProvider) { + this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); + this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); + } } } @@ -524,6 +530,8 @@ public class ExternalFileScanNode extends ExternalScanNode { output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) + .append("\n"); output.append(prefix); if (cardinality > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 17e6d3417f..e257f96359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -35,6 +35,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; @@ -48,7 +49,6 @@ import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -76,11 +76,12 @@ public class HiveScanProvider extends HMSTableScanProvider { private static final String DEFAULT_LINE_DELIMITER = "\n"; protected HMSExternalTable hmsTable; - protected final TupleDescriptor desc; - protected Map columnNameToRange; + protected int totalPartitionNum = 0; + protected int readPartitionNum = 0; + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, Map columnNameToRange) { this.hmsTable = hmsTable; @@ -141,31 +142,32 @@ public class HiveScanProvider extends HMSTableScanProvider { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); // 1. get ListPartitionItems from cache - ImmutableList partitionItems; + HivePartitionValues hivePartitionValues = null; List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { - partitionItems = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), + hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes); - } else { - partitionItems = ImmutableList.of(); } List allFiles = Lists.newArrayList(); - if (!partitionItems.isEmpty()) { + if (hivePartitionValues != null) { // 2. prune partitions by expr - Map keyItemMap = Maps.newHashMap(); - long pid = 0; - for (ListPartitionItem partitionItem : partitionItems) { - keyItemMap.put(pid++, partitionItem); - } - ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(keyItemMap, - hmsTable.getPartitionColumns(), columnNameToRange); + Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + hmsTable.getPartitionColumns(), columnNameToRange, + hivePartitionValues.getUidToPartitionRange(), + hivePartitionValues.getRangeToId(), + hivePartitionValues.getSingleColumnRangeMap()); Collection filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms", + hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); // 3. get partitions from cache List> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); for (Long id : filteredPartitionIds) { - ListPartitionItem listPartitionItem = (ListPartitionItem) keyItemMap.get(id); + ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } List partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), @@ -178,6 +180,8 @@ public class HiveScanProvider extends HMSTableScanProvider { HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), hmsTable.getRemoteTable().getSd().getLocation(), null); getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); + this.totalPartitionNum = 1; + this.readPartitionNum = 1; } LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); @@ -212,6 +216,14 @@ public class HiveScanProvider extends HMSTableScanProvider { return conf; } + public int getTotalPartitionNum() { + return totalPartitionNum; + } + + public int getReadPartitionNum() { + return readPartitionNum; + } + @Override public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { return hmsTable.getRemoteTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java index c3a08d6d50..a26c687e10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java @@ -43,7 +43,7 @@ public class MasterCatalogExecutor { waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; } - public void forward(long catalogId, long dbId, long tableId) throws Exception { + public void forward(long catalogId, long dbId) throws Exception { if (!ctx.getEnv().isReady()) { throw new Exception("Current catalog is not ready, please wait for a while."); } @@ -62,9 +62,6 @@ public class MasterCatalogExecutor { if (dbId != -1) { request.setDbId(dbId); } - if (tableId != -1) { - request.setTableId(tableId); - } boolean isReturnToPool = false; try { TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 57a7cb58d8..a7a128b567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1194,9 +1194,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException { - if (request.isSetCatalogId() && request.isSetDbId() && request.isSetTableId()) { - return initTable(request.catalogId, request.dbId, request.tableId); - } else if (request.isSetCatalogId() && request.isSetDbId()) { + if (request.isSetCatalogId() && request.isSetDbId()) { return initDb(request.catalogId, request.dbId); } else if (request.isSetCatalogId()) { return initCatalog(request.catalogId); @@ -1235,32 +1233,4 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setStatus("OK"); return result; } - - private TInitExternalCtlMetaResult initTable(long catalogId, long dbId, long tableId) - throws TException { - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); - if (!(catalog instanceof ExternalCatalog)) { - throw new TException("Only support forward ExternalCatalog init operation."); - } - DatabaseIf db = catalog.getDbNullable(dbId); - if (db == null) { - throw new TException("database " + dbId + " is null"); - } - if (!(db instanceof ExternalDatabase)) { - throw new TException("Only support forward ExternalDatabase init operation."); - } - TableIf table = db.getTableNullable(tableId); - if (table == null) { - throw new TException("table " + tableId + " is null"); - } - if (!(table instanceof ExternalTable)) { - throw new TException("Only support forward ExternalTable init operation."); - } - - ((ExternalTable) table).makeSureInitialized(); - TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); - result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); - result.setStatus("OK"); - return result; - } } diff --git a/regression-test/data/es_p0/test_es_query.out b/regression-test/data/es_p0/test_es_query.out index 651370b242..29768407a7 100644 --- a/regression-test/data/es_p0/test_es_query.out +++ b/regression-test/data/es_p0/test_es_query.out @@ -1,12 +1,31 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql1 -- +-- !sql61 -- test1 test2 test2_20220808 --- !sql1 -- +-- !sql62 -- 2022-08-08 text#1 3.14 string1 --- !sql1 -- +-- !sql63 -- +2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql64 -- +2022-09-08 text2 4.0 string2 + +-- !sql71 -- +test1 +test2 +test2_20220808 + +-- !sql72 -- 2022-08-08 text#1 3.14 string1 +-- !sql73 -- +2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql74 -- +2022-09-08 text2 4.0 string2 + diff --git a/regression-test/suites/es_p0/test_es_query.groovy b/regression-test/suites/es_p0/test_es_query.groovy index 2e8ec3c392..04b939a51d 100644 --- a/regression-test/suites/es_p0/test_es_query.groovy +++ b/regression-test/suites/es_p0/test_es_query.groovy @@ -54,9 +54,18 @@ suite("test_es_query", "p0") { ); """ sql """switch es6""" - order_qt_sql1 """show tables""" - order_qt_sql1 """select * from test1 where test2='text#1'""" - sql """switch es8""" - order_qt_sql1 """select * from test1 where test2='text'""" + order_qt_sql61 """show tables""" + order_qt_sql62 """select * from test1 where test2='text#1'""" + order_qt_sql63 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql64 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + sql """switch es7""" + order_qt_sql71 """show tables""" + order_qt_sql72 """select * from test1 where test2='text#1'""" + order_qt_sql73 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql74 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + // es8 has some problem, need fix + // sql """switch es8""" + // order_qt_sql1 """select * from test1 where test2='text'""" + // order_qt_sql2 """select * from test2_20220808 where test4='2022-08-08'""" } -} \ No newline at end of file +}