[feature](nereids) Support multi target rf #20714
Support multi target runtime filter, mainly for set operation, such as union/intersect/except.
This commit is contained in:
@ -94,8 +94,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
|
||||
}
|
||||
|
||||
Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
_state = state;
|
||||
RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
|
||||
_is_pipeline_scan = state->enable_pipeline_exec();
|
||||
|
||||
const TQueryOptions& query_options = state->query_options();
|
||||
@ -109,9 +108,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
} else {
|
||||
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_register_runtime_filter());
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -149,7 +145,8 @@ Status VScanNode::prepare(RuntimeState* state) {
|
||||
// if you want to add some profile in scan node, even it have not new VScanner object
|
||||
// could add here, not in the _init_profile() function
|
||||
_get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime");
|
||||
_acquire_runtime_filter_timer = ADD_TIMER(_runtime_profile, "AcuireRuntimeFilterTime");
|
||||
|
||||
_prepare_rf_timer(_runtime_profile.get());
|
||||
|
||||
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
|
||||
_alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime");
|
||||
@ -311,95 +308,6 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanNode::_register_runtime_filter() {
|
||||
int filter_size = _runtime_filter_descs.size();
|
||||
_runtime_filter_ctxs.reserve(filter_size);
|
||||
_runtime_filter_ready_flag.reserve(filter_size);
|
||||
for (int i = 0; i < filter_size; ++i) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
const auto& filter_desc = _runtime_filter_descs[i];
|
||||
if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
|
||||
DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
|
||||
// Optimize merging phase iff:
|
||||
// 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
|
||||
// 2. This filter is bloom filter (only bloom filter should be used for merging)
|
||||
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
|
||||
false));
|
||||
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
|
||||
filter_desc.filter_id, &runtime_filter));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
|
||||
false));
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
}
|
||||
_runtime_filter_ctxs.emplace_back(runtime_filter);
|
||||
_runtime_filter_ready_flag.emplace_back(false);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool VScanNode::runtime_filters_are_ready_or_timeout() {
|
||||
if (!_blocked_by_rf) {
|
||||
return true;
|
||||
}
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
|
||||
if (!runtime_filter->is_ready_or_timeout()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_blocked_by_rf = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
Status VScanNode::_acquire_runtime_filter(bool wait) {
|
||||
SCOPED_TIMER(_acquire_runtime_filter_timer);
|
||||
VExprSPtrs vexprs;
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
|
||||
bool ready = runtime_filter->is_ready();
|
||||
if (!ready && wait) {
|
||||
ready = runtime_filter->await();
|
||||
}
|
||||
if (ready && !_runtime_filter_ctxs[i].apply_mark) {
|
||||
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
|
||||
_runtime_filter_ctxs[i].apply_mark = true;
|
||||
} else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
|
||||
runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
|
||||
!_runtime_filter_ctxs[i].apply_mark) {
|
||||
_blocked_by_rf = true;
|
||||
} else if (!_runtime_filter_ctxs[i].apply_mark) {
|
||||
DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
|
||||
_is_all_rf_applied = false;
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
|
||||
if (_blocked_by_rf) {
|
||||
return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
|
||||
if (vexprs.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
for (auto& expr : vexprs) {
|
||||
VExprContextSPtr conjunct = VExprContext::create_shared(expr);
|
||||
RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
|
||||
RETURN_IF_ERROR(conjunct->open(_state));
|
||||
_rf_vexpr_set.insert(expr);
|
||||
_conjuncts.emplace_back(conjunct);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
@ -1280,46 +1188,6 @@ Status VScanNode::_change_value_range(ColumnValueRange<PrimitiveType>& temp_rang
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
|
||||
if (_is_all_rf_applied) {
|
||||
*arrived_rf_num = _runtime_filter_descs.size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// This method will be called in scanner thread.
|
||||
// So need to add lock
|
||||
std::unique_lock l(_rf_locks);
|
||||
if (_is_all_rf_applied) {
|
||||
*arrived_rf_num = _runtime_filter_descs.size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// 1. Check if are runtime filter ready but not applied.
|
||||
VExprSPtrs exprs;
|
||||
int current_arrived_rf_num = 0;
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
if (_runtime_filter_ctxs[i].apply_mark) {
|
||||
++current_arrived_rf_num;
|
||||
continue;
|
||||
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
|
||||
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
|
||||
&exprs, _row_descriptor, _state));
|
||||
++current_arrived_rf_num;
|
||||
_runtime_filter_ctxs[i].apply_mark = true;
|
||||
}
|
||||
}
|
||||
// 2. Append unapplied runtime filters to vconjunct_ctx_ptr
|
||||
if (!exprs.empty()) {
|
||||
RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
|
||||
}
|
||||
if (current_arrived_rf_num == _runtime_filter_descs.size()) {
|
||||
_is_all_rf_applied = true;
|
||||
}
|
||||
|
||||
*arrived_rf_num = current_arrived_rf_num;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VScanNode::clone_conjunct_ctxs(VExprContextSPtrs& conjuncts) {
|
||||
if (!_conjuncts.empty()) {
|
||||
std::unique_lock l(_rf_locks);
|
||||
|
||||
Reference in New Issue
Block a user