diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index dcad57a418..6a57e917b4 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -205,7 +205,7 @@ Status NewEsScanNode::_init_scanners(std::list* scanners) { properties, _docvalue_context, doc_value_mode); _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare(_state)); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast(scanner)); } return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index cfde06ab3c..358a226d1e 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -41,8 +41,12 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l _docvalue_context(docvalue_context), _doc_value_mode(doc_value_mode) {} -Status NewEsScanner::prepare(RuntimeState* state) { +Status NewEsScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } if (_is_init) { return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index be4d50448b..4e82d72af9 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -36,7 +36,7 @@ public: Status close(RuntimeState* state) override; public: - Status prepare(RuntimeState* state); + Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 066074f16a..b91300bf47 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -19,7 +19,7 @@ namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, - TupleId tuple_id, std::string query_string) + const TupleId& tuple_id, const std::string& query_string) : VScanner(state, static_cast(parent), limit), _is_init(false), _jdbc_eos(false), diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 6a45462db1..24c2649dfe 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -25,8 +25,8 @@ namespace doris { namespace vectorized { class NewJdbcScanner : public VScanner { public: - NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, TupleId tuple_id, - std::string query_string); + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, + const TupleId& tuple_id, const std::string& query_string); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/docker/thirdparties/start-thirdparties-docker.sh b/docker/thirdparties/start-thirdparties-docker.sh index 8f1a577d9b..986d04da7a 100755 --- a/docker/thirdparties/start-thirdparties-docker.sh +++ b/docker/thirdparties/start-thirdparties-docker.sh @@ -31,7 +31,7 @@ ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" CONTAINER_UID="doris--" # elasticsearch -sed -i "" "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml +sed -i "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/elasticsearch/es.yaml sudo docker compose -f "${ROOT}"/docker-compose/elasticsearch/es.yaml --env-file "${ROOT}"/docker-compose/elasticsearch/es.env down sudo mkdir -p "${ROOT}"/docker-compose/elasticsearch/data/es6/ sudo rm -rf "${ROOT}"/docker-compose/elasticsearch/data/es6/* diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java index eb8c8972b1..e2efd6aae1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java @@ -49,7 +49,7 @@ public class EsExternalTable extends ExternalTable { super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE); } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { esTable = toEsTable(); objectCreated = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 87d1cccf5d..ffbbda4a46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -107,7 +107,7 @@ public class ExternalDatabase implements DatabaseIf, // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(extCatalog.getId(), id, -1); + remoteExecutor.forward(extCatalog.getId(), id); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init external db %s operation to master", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 36189c9a00..e97c5b1a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -100,7 +100,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return false; } - public void makeSureInitialized() { + protected void makeSureInitialized() { throw new NotImplementedException(); } @@ -213,7 +213,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public List getFullSchema() { - makeSureInitialized(); ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); return cache.getSchema(dbName, name); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 6e12f7e30a..168cf31a65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -78,7 +78,7 @@ public class HMSExternalTable extends ExternalTable { return dlaType != DLAType.UNKNOWN; } - public synchronized void makeSureInitialized() { + protected synchronized void makeSureInitialized() { if (!objectCreated) { try { getRemoteTable(); @@ -98,10 +98,29 @@ public class HMSExternalTable extends ExternalTable { dlaType = DLAType.UNKNOWN; } } + + initPartitionColumns(); objectCreated = true; } } + private void initPartitionColumns() { + Set partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) + .collect(Collectors.toSet()); + partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + for (String partitionKey : partitionKeys) { + // Do not use "getColumn()", which will cause dead loop + List schema = getFullSchema(); + for (Column column : schema) { + if (partitionKey.equals(column.getName())) { + partitionColumns.add(column); + break; + } + } + } + LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); + } + /** * Now we only support cow table in iceberg. */ @@ -161,13 +180,11 @@ public class HMSExternalTable extends ExternalTable { public List getPartitionColumnTypes() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); } public List getPartitionColumns() { makeSureInitialized(); - initPartitionColumns(); return partitionColumns; } @@ -268,30 +285,5 @@ public class HMSExternalTable extends ExternalTable { public Map getS3Properties() { return catalog.getCatalogProperty().getS3Properties(); } - - private void initPartitionColumns() { - if (partitionColumns != null) { - return; - } - synchronized (this) { - if (partitionColumns != null) { - return; - } - Set partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) - .collect(Collectors.toSet()); - partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); - for (String partitionKey : partitionKeys) { - // Do not use "getColumn()", which will cause dead loop - List schema = getFullSchema(); - for (Column column : schema) { - if (partitionKey.equals(column.getName())) { - partitionColumns.add(column); - break; - } - } - } - LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 0663fed2e1..385c85fadf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -102,7 +102,7 @@ public abstract class ExternalCatalog implements CatalogIf, Wr // Forward to master and wait the journal to replay. MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(); try { - remoteExecutor.forward(id, -1, -1); + remoteExecutor.forward(id, -1); } catch (Exception e) { Util.logAndThrowRuntimeException(LOG, String.format("failed to forward init catalog %s operation to master.", name), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 388c847559..ddcc12d5fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.PartitionValue; import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -31,6 +32,9 @@ import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -38,6 +42,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -70,7 +77,7 @@ public class HiveMetaStoreCache { private HMSExternalCatalog catalog; // cache from -> - private LoadingCache> partitionValuesCache; + private LoadingCache partitionValuesCache; // cache from -> private LoadingCache partitionCache; // cache from -> @@ -86,9 +93,9 @@ public class HiveMetaStoreCache { partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num) .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) .build(CacheLoader.asyncReloading( - new CacheLoader>() { + new CacheLoader() { @Override - public ImmutableList load(PartitionValueCacheKey key) throws Exception { + public HivePartitionValues load(PartitionValueCacheKey key) throws Exception { return loadPartitionValues(key); } }, executor)); @@ -148,17 +155,31 @@ public class HiveMetaStoreCache { MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge); } - private ImmutableList loadPartitionValues(PartitionValueCacheKey key) { + private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) { // partition name format: nation=cn/city=beijing List partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName); if (LOG.isDebugEnabled()) { LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName()); } - List partitionValues = Lists.newArrayListWithExpectedSize(partitionNames.size()); + Map idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size()); + long idx = 0; for (String partitionName : partitionNames) { - partitionValues.add(toListPartitionItem(partitionName, key.types)); + idToPartitionItem.put(idx++, toListPartitionItem(partitionName, key.types)); } - return ImmutableList.copyOf(partitionValues); + + Map> uidToPartitionRange = null; + Map, UniqueId> rangeToId = null; + RangeMap singleColumnRangeMap = null; + if (key.types.size() > 1) { + // uidToPartitionRange and rangeToId are only used for multi-column partition + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem); + rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + } else { + Preconditions.checkState(key.types.size() == 1, key.types); + // singleColumnRangeMap is only used for single-column partition + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem); + } + return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap); } private ListPartitionItem toListPartitionItem(String partitionName, List types) { @@ -248,7 +269,7 @@ public class HiveMetaStoreCache { return configuration; } - public ImmutableList getPartitionValues(String dbName, String tblName, List types) { + public HivePartitionValues getPartitionValues(String dbName, String tblName, List types) { PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types); try { return partitionValuesCache.get(key); @@ -420,4 +441,22 @@ public class HiveMetaStoreCache { return "FileCacheKey{" + "location='" + location + '\'' + ", inputFormat='" + inputFormat + '\'' + '}'; } } + + @Data + public static class HivePartitionValues { + private Map idToPartitionItem = Maps.newHashMap(); + private Map> uidToPartitionRange; + private Map, UniqueId> rangeToId; + private RangeMap singleColumnRangeMap; + + public HivePartitionValues(Map idToPartitionItem, + Map> uidToPartitionRange, + Map, UniqueId> rangeToId, + RangeMap singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + this.singleColumnRangeMap = singleColumnRangeMap; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java index b8dd5f8d84..8a8055f72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java @@ -274,8 +274,11 @@ public class EsUtil { notPushDownList.add(expr); return null; } - } else { + } else if (leftExpr instanceof SlotRef) { column = ((SlotRef) leftExpr).getColumnName(); + } else { + notPushDownList.add(expr); + return null; } // Replace col with col.keyword if mapping exist. column = fieldsContext.getOrDefault(column, column); @@ -448,3 +451,4 @@ public class EsUtil { } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java index 6d45efc8dd..bbefea50e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java @@ -24,11 +24,14 @@ import org.apache.doris.common.AnalysisException; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.Collections; @@ -40,33 +43,62 @@ import java.util.stream.Collectors; /** * ListPartitionPrunerV2 + * * @since 1.0 */ @SuppressWarnings("UnstableApiUsage") public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { - private final Map> uidToPartitionRange; + private static final Logger LOG = LogManager.getLogger(ListPartitionPrunerV2.class); + // `uidToPartitionRange` is only used for multiple columns partition. + private Map> uidToPartitionRange; + private Map, UniqueId> rangeToId; public ListPartitionPrunerV2(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); this.uidToPartitionRange = Maps.newHashMap(); if (partitionColumns.size() > 1) { - // `uidToPartitionRange` is only used for multiple columns partition. - idToPartitionItem.forEach((id, item) -> { - List keys = item.getItems(); - List> ranges = keys.stream() - .map(key -> Range.closed(key, key)) - .collect(Collectors.toList()); - for (int i = 0; i < ranges.size(); i++) { - uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); - } - }); + this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem); + this.rangeToId = genRangeToId(uidToPartitionRange); } } + // Pass uidToPartitionRange and rangeToId from outside + public ListPartitionPrunerV2(Map idToPartitionItem, + List partitionColumns, + Map columnNameToRange, + Map> uidToPartitionRange, + Map, UniqueId> rangeToId, + RangeMap singleColumnRangeMap) { + super(idToPartitionItem, partitionColumns, columnNameToRange, singleColumnRangeMap); + this.uidToPartitionRange = uidToPartitionRange; + this.rangeToId = rangeToId; + } + + public static Map> genUidToPartitionRange( + Map idToPartitionItem) { + Map> uidToPartitionRange = Maps.newHashMap(); + idToPartitionItem.forEach((id, item) -> { + List keys = item.getItems(); + List> ranges = keys.stream() + .map(key -> Range.closed(key, key)) + .collect(Collectors.toList()); + for (int i = 0; i < ranges.size(); i++) { + uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); + } + }); + return uidToPartitionRange; + } + @Override - RangeMap getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap genSingleColumnRangeMap(Map idToPartitionItem) { RangeMap candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { List keys = item.getItems(); @@ -75,7 +107,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { .collect(Collectors.toList()); for (int i = 0; i < ranges.size(); i++) { candidate.put(mapPartitionKeyRange(ranges.get(i), 0), - new ListPartitionUniqueId(id, i)); + new ListPartitionUniqueId(id, i)); } }); return candidate; @@ -86,7 +118,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { */ @Override FinalFilters getFinalFilters(ColumnRange columnRange, - Column column) throws AnalysisException { + Column column) throws AnalysisException { if (!columnRange.hasFilter()) { return FinalFilters.noFilters(); } @@ -107,19 +139,26 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { @Override Collection pruneMultipleColumnPartition( Map columnToFilters) throws AnalysisException { - Map, UniqueId> rangeToId = Maps.newHashMap(); - uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid)); + Preconditions.checkNotNull(uidToPartitionRange); + Preconditions.checkNotNull(rangeToId); return doPruneMultiple(columnToFilters, rangeToId, 0); } + public static Map, UniqueId> genRangeToId( + Map> uidToPartitionRange) { + Map, UniqueId> rangeToId = Maps.newHashMap(); + uidToPartitionRange.forEach((uid, range) -> rangeToId.put(range, uid)); + return rangeToId; + } + private Collection doPruneMultiple(Map columnToFilters, - Map, UniqueId> partitionRangeToUid, - int columnIdx) { + Map, UniqueId> partitionRangeToUid, + int columnIdx) { // No more partition column. if (columnIdx == partitionColumns.size()) { return partitionRangeToUid.values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); } FinalFilters finalFilters = columnToFilters.get(partitionColumns.get(columnIdx)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java index d2b169d3de..e1772509ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionPrunerV2Base.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; +import com.google.common.base.Preconditions; import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,15 +41,28 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { protected final Map idToPartitionItem; protected final List partitionColumns; protected final Map columnNameToRange; + // used for single column partition + protected RangeMap singleColumnRangeMap = null; public PartitionPrunerV2Base(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { this.idToPartitionItem = idToPartitionItem; this.partitionColumns = partitionColumns; this.columnNameToRange = columnNameToRange; } + // pass singleColumnRangeMap from outside + public PartitionPrunerV2Base(Map idToPartitionItem, + List partitionColumns, + Map columnNameToRange, + RangeMap singleColumnRangeMap) { + this.idToPartitionItem = idToPartitionItem; + this.partitionColumns = partitionColumns; + this.columnNameToRange = columnNameToRange; + this.singleColumnRangeMap = singleColumnRangeMap; + } + @Override public Collection prune() throws AnalysisException { Map columnToFilters = Maps.newHashMap(); @@ -70,7 +84,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - abstract RangeMap getCandidateRangeMap(); + abstract void genSingleColumnRangeMap(); /** * Handle conjunctive and disjunctive `is null` predicates. @@ -107,29 +121,30 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { case CONSTANT_FALSE_FILTERS: return Collections.emptyList(); case HAVE_FILTERS: - RangeMap candidate = getCandidateRangeMap(); + genSingleColumnRangeMap(); + Preconditions.checkNotNull(singleColumnRangeMap); return finalFilters.filters.stream() - .map(filter -> { - RangeMap filtered = candidate.subRangeMap(filter); - return filtered.asMapOfRanges().values().stream() - .map(UniqueId::getPartitionId) - .collect(Collectors.toSet()); - }) - .flatMap(Set::stream) - .collect(Collectors.toSet()); + .map(filter -> { + RangeMap filtered = singleColumnRangeMap.subRangeMap(filter); + return filtered.asMapOfRanges().values().stream() + .map(UniqueId::getPartitionId) + .collect(Collectors.toSet()); + }) + .flatMap(Set::stream) + .collect(Collectors.toSet()); case NO_FILTERS: default: return idToPartitionItem.keySet(); } } - protected Range mapPartitionKeyRange(Range fromRange, - int columnIdx) { + protected static Range mapPartitionKeyRange(Range fromRange, + int columnIdx) { return mapRange(fromRange, - partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); + partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx))); } - protected Range mapRange( + private static Range mapRange( Range range, Function mapper) { TO lower = range.hasLowerBound() ? mapper.apply(range.lowerEndpoint()) : null; TO upper = range.hasUpperBound() ? mapper.apply(range.upperEndpoint()) : null; @@ -157,7 +172,7 @@ public abstract class PartitionPrunerV2Base implements PartitionPruner { } } - protected interface UniqueId { + public interface UniqueId { long getPartitionId(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java index 256c5c11f4..4aa3ee41a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RangePartitionPrunerV2.java @@ -43,13 +43,19 @@ import java.util.Set; public class RangePartitionPrunerV2 extends PartitionPrunerV2Base { public RangePartitionPrunerV2(Map idToPartitionItem, - List partitionColumns, - Map columnNameToRange) { + List partitionColumns, + Map columnNameToRange) { super(idToPartitionItem, partitionColumns, columnNameToRange); } @Override - RangeMap getCandidateRangeMap() { + void genSingleColumnRangeMap() { + if (singleColumnRangeMap == null) { + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + } + } + + public static RangeMap genSingleColumnRangeMap(Map idToPartitionItem) { RangeMap candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { Range range = item.getItems(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 0cf6661846..0dd425d917 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -121,6 +121,8 @@ public class ExternalFileScanNode extends ExternalScanNode { // For explain private long inputSplitsNum = 0; private long totalFileSize = 0; + private long totalPartitionNum = 0; + private long readPartitionNum = 0; /** * External file scan node for: @@ -302,6 +304,10 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + if (scanProvider instanceof HiveScanProvider) { + this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); + this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); + } } } @@ -524,6 +530,8 @@ public class ExternalFileScanNode extends ExternalScanNode { output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=") .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum) + .append("\n"); output.append(prefix); if (cardinality > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 17e6d3417f..e257f96359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -35,6 +35,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.ColumnRange; @@ -48,7 +49,6 @@ import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -76,11 +76,12 @@ public class HiveScanProvider extends HMSTableScanProvider { private static final String DEFAULT_LINE_DELIMITER = "\n"; protected HMSExternalTable hmsTable; - protected final TupleDescriptor desc; - protected Map columnNameToRange; + protected int totalPartitionNum = 0; + protected int readPartitionNum = 0; + public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc, Map columnNameToRange) { this.hmsTable = hmsTable; @@ -141,31 +142,32 @@ public class HiveScanProvider extends HMSTableScanProvider { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); // 1. get ListPartitionItems from cache - ImmutableList partitionItems; + HivePartitionValues hivePartitionValues = null; List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { - partitionItems = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), + hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes); - } else { - partitionItems = ImmutableList.of(); } List allFiles = Lists.newArrayList(); - if (!partitionItems.isEmpty()) { + if (hivePartitionValues != null) { // 2. prune partitions by expr - Map keyItemMap = Maps.newHashMap(); - long pid = 0; - for (ListPartitionItem partitionItem : partitionItems) { - keyItemMap.put(pid++, partitionItem); - } - ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(keyItemMap, - hmsTable.getPartitionColumns(), columnNameToRange); + Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + hmsTable.getPartitionColumns(), columnNameToRange, + hivePartitionValues.getUidToPartitionRange(), + hivePartitionValues.getRangeToId(), + hivePartitionValues.getSingleColumnRangeMap()); Collection filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms", + hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); // 3. get partitions from cache List> partitionValuesList = Lists.newArrayListWithCapacity(filteredPartitionIds.size()); for (Long id : filteredPartitionIds) { - ListPartitionItem listPartitionItem = (ListPartitionItem) keyItemMap.get(id); + ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } List partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), @@ -178,6 +180,8 @@ public class HiveScanProvider extends HMSTableScanProvider { HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), hmsTable.getRemoteTable().getSd().getLocation(), null); getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); + this.totalPartitionNum = 1; + this.readPartitionNum = 1; } LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); @@ -212,6 +216,14 @@ public class HiveScanProvider extends HMSTableScanProvider { return conf; } + public int getTotalPartitionNum() { + return totalPartitionNum; + } + + public int getReadPartitionNum() { + return readPartitionNum; + } + @Override public Table getRemoteHiveTable() throws DdlException, MetaNotFoundException { return hmsTable.getRemoteTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java index c3a08d6d50..a26c687e10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterCatalogExecutor.java @@ -43,7 +43,7 @@ public class MasterCatalogExecutor { waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; } - public void forward(long catalogId, long dbId, long tableId) throws Exception { + public void forward(long catalogId, long dbId) throws Exception { if (!ctx.getEnv().isReady()) { throw new Exception("Current catalog is not ready, please wait for a while."); } @@ -62,9 +62,6 @@ public class MasterCatalogExecutor { if (dbId != -1) { request.setDbId(dbId); } - if (tableId != -1) { - request.setTableId(tableId); - } boolean isReturnToPool = false; try { TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 57a7cb58d8..a7a128b567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1194,9 +1194,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException { - if (request.isSetCatalogId() && request.isSetDbId() && request.isSetTableId()) { - return initTable(request.catalogId, request.dbId, request.tableId); - } else if (request.isSetCatalogId() && request.isSetDbId()) { + if (request.isSetCatalogId() && request.isSetDbId()) { return initDb(request.catalogId, request.dbId); } else if (request.isSetCatalogId()) { return initCatalog(request.catalogId); @@ -1235,32 +1233,4 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setStatus("OK"); return result; } - - private TInitExternalCtlMetaResult initTable(long catalogId, long dbId, long tableId) - throws TException { - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); - if (!(catalog instanceof ExternalCatalog)) { - throw new TException("Only support forward ExternalCatalog init operation."); - } - DatabaseIf db = catalog.getDbNullable(dbId); - if (db == null) { - throw new TException("database " + dbId + " is null"); - } - if (!(db instanceof ExternalDatabase)) { - throw new TException("Only support forward ExternalDatabase init operation."); - } - TableIf table = db.getTableNullable(tableId); - if (table == null) { - throw new TException("table " + tableId + " is null"); - } - if (!(table instanceof ExternalTable)) { - throw new TException("Only support forward ExternalTable init operation."); - } - - ((ExternalTable) table).makeSureInitialized(); - TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult(); - result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); - result.setStatus("OK"); - return result; - } } diff --git a/regression-test/data/es_p0/test_es_query.out b/regression-test/data/es_p0/test_es_query.out index 651370b242..29768407a7 100644 --- a/regression-test/data/es_p0/test_es_query.out +++ b/regression-test/data/es_p0/test_es_query.out @@ -1,12 +1,31 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql1 -- +-- !sql61 -- test1 test2 test2_20220808 --- !sql1 -- +-- !sql62 -- 2022-08-08 text#1 3.14 string1 --- !sql1 -- +-- !sql63 -- +2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql64 -- +2022-09-08 text2 4.0 string2 + +-- !sql71 -- +test1 +test2 +test2_20220808 + +-- !sql72 -- 2022-08-08 text#1 3.14 string1 +-- !sql73 -- +2022-08-08 text#1 3.14 string1 +2022-08-08 text3_4*5 5.0 string3 + +-- !sql74 -- +2022-09-08 text2 4.0 string2 + diff --git a/regression-test/suites/es_p0/test_es_query.groovy b/regression-test/suites/es_p0/test_es_query.groovy index 2e8ec3c392..04b939a51d 100644 --- a/regression-test/suites/es_p0/test_es_query.groovy +++ b/regression-test/suites/es_p0/test_es_query.groovy @@ -54,9 +54,18 @@ suite("test_es_query", "p0") { ); """ sql """switch es6""" - order_qt_sql1 """show tables""" - order_qt_sql1 """select * from test1 where test2='text#1'""" - sql """switch es8""" - order_qt_sql1 """select * from test1 where test2='text'""" + order_qt_sql61 """show tables""" + order_qt_sql62 """select * from test1 where test2='text#1'""" + order_qt_sql63 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql64 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + sql """switch es7""" + order_qt_sql71 """show tables""" + order_qt_sql72 """select * from test1 where test2='text#1'""" + order_qt_sql73 """select * from test2_20220808 where test4='2022-08-08'""" + order_qt_sql74 """select * from test2_20220808 where substring(test2, 2) = 'ext2'""" + // es8 has some problem, need fix + // sql """switch es8""" + // order_qt_sql1 """select * from test1 where test2='text'""" + // order_qt_sql2 """select * from test2_20220808 where test4='2022-08-08'""" } -} \ No newline at end of file +}