[Opt](rf) Opt broadcast join remote runtime filter merge and wait (#29439)

This commit is contained in:
HappenLee
2024-01-03 11:21:28 +08:00
committed by GitHub
parent 067a9a3a22
commit 14e7eb7624
8 changed files with 32 additions and 16 deletions

View File

@ -985,7 +985,7 @@ Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int*
return Status::OK();
}
Status IRuntimeFilter::publish() {
Status IRuntimeFilter::publish(bool publish_local) {
DCHECK(is_producer());
if (_is_global) {
std::vector<IRuntimeFilter*> filters;
@ -1010,13 +1010,17 @@ Status IRuntimeFilter::publish() {
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
return Status::OK();
} else {
} else if (!publish_local) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return push_to_remote(_state, &addr, _opt_remote_rf);
} else {
// remote broadcast join only push onetime in build shared hash table
// publish_local only set true on copy shared hash table
DCHECK(_is_broadcast_join);
}
return Status::OK();
}
Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,

View File

@ -228,7 +228,7 @@ public:
// publish filter
// push filter to remote node or push down it to scan_node
Status publish();
Status publish(bool publish_local = false);
RuntimeFilterType type() const { return _runtime_filter_type; }

View File

@ -204,10 +204,10 @@ public:
}
// publish runtime filter
Status publish() {
Status publish(bool publish_local = false) {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
RETURN_IF_ERROR(filter->publish());
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
}
}
return Status::OK();

View File

@ -568,7 +568,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(
local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish());
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true));
return Status::OK();
}},
*local_state._shared_state->hash_table_variants);

View File

@ -328,6 +328,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
cntVal = iter->second.first;
{
std::lock_guard<std::mutex> l(*iter->second.second);
// Skip the other broadcast join runtime filter
if (cntVal->arrive_id.size() == 1 && cntVal->runtime_filter_desc.is_broadcast_join) {
return Status::OK();
}
MergeRuntimeFilterParams params(request, attach_data);
ObjectPool* pool = cntVal->pool.get();
RuntimeFilterWrapperHolder holder;
@ -339,11 +343,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
DCHECK_LE(merged_size, cntVal->producer_size);
cntVal->merge_time += (MonotonicMillis() - start_merge);
if (merged_size < cntVal->producer_size) {
return Status::OK();
} else {
merge_time = cntVal->merge_time;
}
merge_time = cntVal->merge_time;
}
if (merged_size == cntVal->producer_size) {

View File

@ -824,7 +824,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
state, arg.hash_table->size()));
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
RETURN_IF_ERROR(_runtime_filter_slots->publish());
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
return Status::OK();
}},
*_hash_table_variants);

View File

@ -604,6 +604,10 @@ public final class RuntimeFilter {
isBroadcastJoin = isBroadcast;
}
public boolean isBroadcast() {
return isBroadcastJoin;
}
public void computeNdvEstimate() {
if (ndvEstimate < 0) {
ndvEstimate = builderNode.getChild(1).getCardinalityAfterFilter();

View File

@ -3627,6 +3627,9 @@ public class Coordinator implements CoordInterface {
params.params.setRuntimeFilterParams(new TRuntimeFilterParams());
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
.map(r -> r.getFilterId().asInt()).collect(Collectors.toSet());
for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
@ -3668,7 +3671,8 @@ public class Coordinator implements CoordInterface {
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
entry.getKey().asInt(), entry.getValue());
entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt())
? 1 : entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
params.params.runtime_filter_params.putToRidToRuntimeFilter(
@ -3753,6 +3757,9 @@ public class Coordinator implements CoordInterface {
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
.map(r -> r.getFilterId().asInt()).collect(Collectors.toSet());
for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
@ -3795,7 +3802,8 @@ public class Coordinator implements CoordInterface {
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
entry.getKey().asInt(), entry.getValue());
entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt()) ? 1 :
entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
localParams.runtime_filter_params.putToRidToRuntimeFilter(