From 1da39771e320cb0491fd0ad05bda6c467e68bb4f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 18 Aug 2022 14:47:08 +0800 Subject: [PATCH] [Bug](runtime filter) Fix bug for runtime filter in concurrent scanners (#11848) --- be/src/vec/exec/volap_scan_node.cpp | 35 +++++++++-------------------- be/src/vec/exec/volap_scan_node.h | 2 +- 2 files changed, 12 insertions(+), 25 deletions(-) diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 16c41043dc..ab2f9eda38 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -103,7 +103,6 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filter_ctxs[i].runtimefilter = runtime_filter; _runtime_filter_ready_flag[i] = false; - _rf_locks.push_back(std::make_unique()); } return Status::OK(); @@ -235,11 +234,7 @@ Status VOlapScanNode::prepare(RuntimeState* state) { _runtime_state = state; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, - &runtime_filter); - DCHECK(runtime_filter != nullptr); - runtime_filter->init_profile(_runtime_profile.get()); + _runtime_filter_ctxs[i].runtimefilter->init_profile(_runtime_profile.get()); } return Status::OK(); } @@ -259,13 +254,7 @@ Status VOlapScanNode::open(RuntimeState* state) { std::vector vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - auto& filter_desc = _runtime_filter_descs[i]; - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); - DCHECK(runtime_filter != nullptr); - if (runtime_filter == nullptr) { - continue; - } + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; bool ready = runtime_filter->is_ready(); if (!ready) { ready = runtime_filter->await(); @@ -431,16 +420,14 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only /// once after any runtime_filters are ready. /// 3. finally, just copy this new VExprContext to scanner and use it to filter data. - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, - &runtime_filter); + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; DCHECK(runtime_filter != nullptr); bool ready = runtime_filter->is_ready(); if (ready) { runtime_filter->get_prepared_vexprs(&vexprs, row_desc()); scanner_filter_apply_marks[i] = true; if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) { - std::unique_lock l(*(_rf_locks[i])); + std::lock_guard l(_rf_lock); if (!_runtime_filter_ready_flag[i]) { // Use all conjuncts and new arrival runtime filters to construct a new // expression tree here. @@ -456,8 +443,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { if (*scanner->vconjunct_ctx_ptr()) { scanner->discard_conjuncts(); } - WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()), - "Something wrong for runtime filters: "); + { + std::shared_lock l(_rf_lock); + WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()), + "Something wrong for runtime filters: "); + } } std::vector blocks; @@ -1002,11 +992,8 @@ Status VOlapScanNode::close(RuntimeState* state) { scanner->close(state); } - for (auto& filter_desc : _runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); - DCHECK(runtime_filter != nullptr); - runtime_filter->consumer_close(); + for (auto& filter_ctx : _runtime_filter_ctxs) { + filter_ctx.runtimefilter->consumer_close(); } for (auto& ctx : _stale_vexpr_ctxs) { diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index 7b3f383b62..e83b1c0e48 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -244,7 +244,7 @@ private: std::vector _runtime_filter_descs; std::vector _runtime_filter_ctxs; std::vector _runtime_filter_ready_flag; - std::vector> _rf_locks; + std::shared_mutex _rf_lock; std::map _conjunctid_to_runtime_filter_ctxs; std::unique_ptr _scanner_profile;