[Bug](runtime filter) Fix bug for runtime filter in concurrent scanners (#11848)
This commit is contained in:
@ -103,7 +103,6 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
|
||||
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
|
||||
_runtime_filter_ready_flag[i] = false;
|
||||
_rf_locks.push_back(std::make_unique<std::mutex>());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -235,11 +234,7 @@ Status VOlapScanNode::prepare(RuntimeState* state) {
|
||||
|
||||
_runtime_state = state;
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
||||
&runtime_filter);
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
runtime_filter->init_profile(_runtime_profile.get());
|
||||
_runtime_filter_ctxs[i].runtimefilter->init_profile(_runtime_profile.get());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -259,13 +254,7 @@ Status VOlapScanNode::open(RuntimeState* state) {
|
||||
|
||||
std::vector<VExpr*> vexprs;
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
auto& filter_desc = _runtime_filter_descs[i];
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
if (runtime_filter == nullptr) {
|
||||
continue;
|
||||
}
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
|
||||
bool ready = runtime_filter->is_ready();
|
||||
if (!ready) {
|
||||
ready = runtime_filter->await();
|
||||
@ -431,16 +420,14 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
|
||||
/// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
|
||||
/// once after any runtime_filters are ready.
|
||||
/// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
|
||||
&runtime_filter);
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
bool ready = runtime_filter->is_ready();
|
||||
if (ready) {
|
||||
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
|
||||
scanner_filter_apply_marks[i] = true;
|
||||
if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) {
|
||||
std::unique_lock<std::mutex> l(*(_rf_locks[i]));
|
||||
std::lock_guard<std::shared_mutex> l(_rf_lock);
|
||||
if (!_runtime_filter_ready_flag[i]) {
|
||||
// Use all conjuncts and new arrival runtime filters to construct a new
|
||||
// expression tree here.
|
||||
@ -456,8 +443,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
|
||||
if (*scanner->vconjunct_ctx_ptr()) {
|
||||
scanner->discard_conjuncts();
|
||||
}
|
||||
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
|
||||
"Something wrong for runtime filters: ");
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> l(_rf_lock);
|
||||
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
|
||||
"Something wrong for runtime filters: ");
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<Block*> blocks;
|
||||
@ -1002,11 +992,8 @@ Status VOlapScanNode::close(RuntimeState* state) {
|
||||
scanner->close(state);
|
||||
}
|
||||
|
||||
for (auto& filter_desc : _runtime_filter_descs) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
runtime_filter->consumer_close();
|
||||
for (auto& filter_ctx : _runtime_filter_ctxs) {
|
||||
filter_ctx.runtimefilter->consumer_close();
|
||||
}
|
||||
|
||||
for (auto& ctx : _stale_vexpr_ctxs) {
|
||||
|
||||
Reference in New Issue
Block a user