[bugfix](runtimefilter) runtime filter is shared between multi instances with same node id, should not cache exprs (#22114)

runtime filter is shared among multi instances.
in the past, we cached pushdown expr(runtime filter generated)
every scannode[runtime filter consumer] will try to call prepare expr
but the expr may generated with different fn_context_id
---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-07-23 13:04:33 +08:00
committed by GitHub
parent 256051a965
commit 2c16fe0da9
3 changed files with 9 additions and 32 deletions

View File

@ -1152,35 +1152,17 @@ Status IRuntimeFilter::publish() {
}
}
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) {
Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs,
bool is_late_arrival) {
DCHECK(is_consumer());
if (!_is_ignored) {
_set_push_down();
_profile->add_info_string("Info", _format_status());
return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
} else {
_profile->add_info_string("Info", _format_status());
return Status::OK();
}
}
Status IRuntimeFilter::get_prepared_exprs(std::vector<vectorized::VExprSPtr>* vexprs,
const RowDescriptor& desc, RuntimeState* state) {
_profile->add_info_string("Info", _format_status());
if (_is_ignored) {
return Status::OK();
}
DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) ||
(_enable_pipeline_exec &&
_rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY));
DCHECK(is_consumer());
std::lock_guard guard(_inner_mutex);
if (_push_down_vexprs.empty()) {
RETURN_IF_ERROR(_wrapper->get_push_exprs(&_push_down_vexprs, _vprobe_ctx));
if (!is_late_arrival) {
_set_push_down();
}
vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end());
return Status::OK();
return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
}
bool IRuntimeFilter::await() {

View File

@ -221,10 +221,7 @@ public:
RuntimeFilterType type() const { return _runtime_filter_type; }
Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
Status get_prepared_exprs(std::vector<doris::vectorized::VExprSPtr>* push_exprs,
const RowDescriptor& desc, RuntimeState* state);
Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, bool is_late_arrival);
bool is_broadcast_join() const { return _is_broadcast_join; }
@ -385,8 +382,6 @@ protected:
bool _is_ignored;
std::string _ignored_msg;
std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
struct RPCContext;
std::shared_ptr<RPCContext> _rpc_context;

View File

@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
ready = runtime_filter->await();
}
if (ready && !_runtime_filter_ctxs[i].apply_mark) {
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs, false));
_runtime_filter_ctxs[i].apply_mark = true;
} else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
!_runtime_filter_ctxs[i].apply_mark) {
@ -151,8 +151,8 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
++current_arrived_rf_num;
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
&exprs, _row_descriptor_ref, _state));
RETURN_IF_ERROR(
_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true));
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}