[Bug](runtimefilter) Fix waiting for runtime filter (#20155)
This commit is contained in:
@ -105,6 +105,8 @@ public final class RuntimeFilter {
|
||||
|
||||
private boolean bitmapFilterNotIn = false;
|
||||
|
||||
private boolean useRemoteRfOpt = true;
|
||||
|
||||
/**
|
||||
* Internal representation of a runtime filter target.
|
||||
*/
|
||||
@ -184,6 +186,17 @@ public final class RuntimeFilter {
|
||||
this.bitmapFilterNotIn = bitmapFilterNotIn;
|
||||
}
|
||||
|
||||
public void computeUseRemoteRfOpt() {
|
||||
for (RuntimeFilterTarget target : targets) {
|
||||
useRemoteRfOpt = useRemoteRfOpt && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM
|
||||
&& target.expr instanceof SlotRef;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getUseRemoteRfOpt() {
|
||||
return useRemoteRfOpt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a runtime filter to Thrift.
|
||||
*/
|
||||
|
||||
@ -100,6 +100,7 @@ import org.apache.doris.thrift.TQueryType;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
import org.apache.doris.thrift.TResourceLimit;
|
||||
import org.apache.doris.thrift.TRuntimeFilterParams;
|
||||
import org.apache.doris.thrift.TRuntimeFilterTargetParams;
|
||||
import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
@ -3138,26 +3139,41 @@ public class Coordinator {
|
||||
params.params.setRuntimeFilterParams(new TRuntimeFilterParams());
|
||||
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
|
||||
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
|
||||
for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry
|
||||
: ridToTargetParam.entrySet()) {
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParams = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : entry.getValue()) {
|
||||
if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParams.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
for (RuntimeFilter rf : assignedRuntimeFilters) {
|
||||
List<FRuntimeFilterTargetParam> fParams = ridToTargetParam.get(rf.getFilterId());
|
||||
rf.computeUseRemoteRfOpt();
|
||||
if (rf.getUseRemoteRfOpt()) {
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParamsV2 = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : fParams) {
|
||||
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
}
|
||||
}
|
||||
params.params.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParamsV2.values()));
|
||||
} else {
|
||||
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
|
||||
for (FRuntimeFilterTargetParam targetParam : fParams) {
|
||||
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
|
||||
targetParam.targetFragmentInstanceAddr));
|
||||
}
|
||||
params.params.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
|
||||
targetParams);
|
||||
}
|
||||
params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
|
||||
}
|
||||
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
|
||||
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
|
||||
@ -3226,27 +3242,42 @@ public class Coordinator {
|
||||
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
|
||||
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
|
||||
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
|
||||
for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry
|
||||
: ridToTargetParam.entrySet()) {
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParams = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : entry.getValue()) {
|
||||
if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParams.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
for (RuntimeFilter rf : assignedRuntimeFilters) {
|
||||
List<FRuntimeFilterTargetParam> fParams = ridToTargetParam.get(rf.getFilterId());
|
||||
rf.computeUseRemoteRfOpt();
|
||||
if (rf.getUseRemoteRfOpt()) {
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParamsV2 = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : fParams) {
|
||||
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
|
||||
.target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
|
||||
localParams.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParamsV2.values()));
|
||||
} else {
|
||||
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
|
||||
for (FRuntimeFilterTargetParam targetParam : fParams) {
|
||||
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
|
||||
targetParam.targetFragmentInstanceAddr));
|
||||
}
|
||||
localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
|
||||
targetParams);
|
||||
}
|
||||
}
|
||||
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
|
||||
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
|
||||
|
||||
Reference in New Issue
Block a user