[feature](ES Catalog)Support control scroll level by config #37180 (#37290)

## Proposed changes

backport #37180
This commit is contained in:
qiye
2024-07-15 16:41:38 +08:00
committed by GitHub
parent 8df2432e94
commit e5339a4014
9 changed files with 801 additions and 160 deletions

View File

@ -42,6 +42,7 @@ import org.apache.doris.datasource.es.QueryBuilders.QueryBuilder;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
@ -63,6 +64,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -216,8 +218,15 @@ public class EsScanNode extends ExternalScanNode {
String.join(",", unPartitionedIndices), String.join(",", partitionedIndices));
}
List<TScanRangeLocations> result = Lists.newArrayList();
boolean enableESParallelScroll = isEnableESParallelScroll();
for (EsShardPartitions indexState : selectedIndex) {
for (List<EsShardRouting> shardRouting : indexState.getShardRoutings().values()) {
// When disabling parallel scroll, only use the first shard routing.
// Because we only need plan a single scan range.
List<List<EsShardRouting>> shardRoutings = enableESParallelScroll
? new ArrayList<>(indexState.getShardRoutings().values()) :
Collections.singletonList(indexState.getShardRoutings().get(0));
for (List<EsShardRouting> shardRouting : shardRoutings) {
// get backends
List<TNetworkAddress> shardAllocations = new ArrayList<>();
List<String> preLocations = new ArrayList<>();
@ -229,7 +238,10 @@ public class EsScanNode extends ExternalScanNode {
FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
backendPolicy.init(preLocations);
TScanRangeLocations locations = new TScanRangeLocations();
for (int i = 0; i < backendPolicy.numBackends(); ++i) {
// When disabling parallel scroll, only use the first backend.
// Because we only need plan a single query to one backend.
int numBackends = enableESParallelScroll ? backendPolicy.numBackends() : 1;
for (int i = 0; i < numBackends; ++i) {
TScanRangeLocation location = new TScanRangeLocation();
Backend be = backendPolicy.getNextBe();
location.setBackendId(be.getId());
@ -240,11 +252,18 @@ public class EsScanNode extends ExternalScanNode {
// Generate on es scan range
TEsScanRange esScanRange = new TEsScanRange();
esScanRange.setEsHosts(shardAllocations);
esScanRange.setIndex(shardRouting.get(0).getIndexName());
// When disabling parallel scroll, use the index state's index name to prevent the index aliases from
// being expanded.
// eg: index alias `log-20240501` may point to multiple indices,
// such as `log-20240501-1`/`log-20240501-2`.
// When we plan a single query, we should use the index alias instead of the real indices names.
esScanRange.setIndex(
enableESParallelScroll ? shardRouting.get(0).getIndexName() : indexState.getIndexName());
if (table.getType() != null) {
esScanRange.setType(table.getMappingType());
}
esScanRange.setShardId(shardRouting.get(0).getShardId());
// When disabling parallel scroll, set shard id to -1 to disable shard preference in query option.
esScanRange.setShardId(enableESParallelScroll ? shardRouting.get(0).getShardId() : -1);
// Scan range
TScanRange scanRange = new TScanRange();
scanRange.setEsScanRange(esScanRange);
@ -267,6 +286,11 @@ public class EsScanNode extends ExternalScanNode {
return result;
}
private boolean isEnableESParallelScroll() {
ConnectContext connectContext = ConnectContext.get();
return connectContext != null && connectContext.getSessionVariable().isEnableESParallelScroll();
}
/**
* if the index name is an alias or index pattern, then the es table is related
* with one or more indices some indices could be pruned by using partition info

View File

@ -574,6 +574,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS = "use_max_length_of_varchar_in_ctas";
public static final String ENABLE_ES_PARALLEL_SCROLL = "enable_es_parallel_scroll";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@ -1916,6 +1918,24 @@ public class SessionVariable implements Serializable, Writable {
})
public boolean useMaxLengthOfVarcharInCtas = true;
/**
* When enabling shard scroll, FE will plan scan ranges by shards of ES indices.
* Otherwise, FE will plan a single query to ES.
*/
@VariableMgr.VarAttr(name = ENABLE_ES_PARALLEL_SCROLL, description = {
"ES catalog 是否开启 shard 级别并发的 scroll 请求,默认开启。",
"Whether to enable shard-level parallel scroll requests for ES catalog, enabled by default."
})
public boolean enableESParallelScroll = true;
public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
public boolean isEnableESParallelScroll() {
return enableESParallelScroll;
}
public boolean isEnableJoinSpill() {
return enableJoinSpill;
}