[feature-wip](multi-catalog) Support runtime filter for file scan node (#11000)

* [feature-wip](multi-catalog) Support runtime filter for file scan node

Co-authored-by: morningman <morningman@apache.org>
This commit is contained in:
Mingyu Chen
2022-07-20 12:36:57 +08:00
committed by GitHub
parent a5a50726bf
commit 56e036e68b
15 changed files with 204 additions and 23 deletions

View File

@ -20,6 +20,7 @@
#include "common/config.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple.h"
@ -30,6 +31,8 @@
#include "util/types.h"
#include "vec/exec/file_arrow_scanner.h"
#include "vec/exec/file_text_scanner.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
@ -42,7 +45,10 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_num_running_scanners(0),
_scan_finished(false),
_max_buffered_batches(32),
_wait_scanner_timer(nullptr) {}
_wait_scanner_timer(nullptr),
_runtime_filter_descs(tnode.runtime_filters) {
LOG(WARNING) << "file scan node runtime filter size=" << _runtime_filter_descs.size();
}
Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::init(tnode, state));
@ -52,6 +58,22 @@ Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
_pre_filter_texprs = file_scan_node.pre_filter_exprs;
}
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.resize(filter_size);
_runtime_filter_ready_flag.resize(filter_size);
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
&runtime_filter));
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
_runtime_filter_ready_flag[i] = false;
_rf_locks.push_back(std::make_unique<std::mutex>());
}
return Status::OK();
}
@ -80,6 +102,9 @@ Status FileScanNode::prepare(RuntimeState* state) {
// Profile
_wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
_filter_timer = ADD_TIMER(runtime_profile(), "PredicateFilteredTime");
_num_rows_filtered = ADD_COUNTER(runtime_profile(), "PredicateFilteredRows", TUnit::UNIT);
_num_scanners = ADD_COUNTER(runtime_profile(), "NumScanners", TUnit::UNIT);
return Status::OK();
}
@ -90,11 +115,94 @@ Status FileScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state));
RETURN_IF_ERROR(start_scanners());
return Status::OK();
}
Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
// acquire runtime filter
_runtime_filter_ctxs.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
auto& filter_desc = _runtime_filter_descs[i];
IRuntimeFilter* runtime_filter = nullptr;
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
DCHECK(runtime_filter != nullptr);
if (runtime_filter == nullptr) {
continue;
}
bool ready = runtime_filter->is_ready();
if (!ready) {
ready = runtime_filter->await();
}
if (ready) {
_runtime_filter_ctxs[i].apply_mark = true;
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
// TODO: currently, after calling get_push_expr_ctxs(), the func ptr in runtime_filter
// will be released, and it will not be used again for building vexpr.
//
// std::list<ExprContext*> expr_context;
// RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
// for (auto ctx : expr_context) {
// ctx->prepare(state, row_desc(), _expr_mem_tracker);
// ctx->open(state);
// int index = _conjunct_ctxs.size();
// _conjunct_ctxs.push_back(ctx);
// // it's safe to store address from a fix-resized vector
// _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
// }
}
}
// rebuild vexpr
for (int i = 0; i < _runtime_filter_ctxs.size(); ++i) {
if (!_runtime_filter_ctxs[i].apply_mark) {
continue;
}
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
std::vector<VExpr*> vexprs;
runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
if (vexprs.empty()) {
continue;
}
auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
TExprNode texpr_node;
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
new_node->add_child(last_expr);
new_node->add_child(vexprs[j]);
last_expr = new_node;
}
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(), expr_mem_tracker());
if (UNLIKELY(!expr_status.OK())) {
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
vexprs.clear();
break;
}
expr_status = new_vconjunct_ctx_ptr->open(state);
if (UNLIKELY(!expr_status.OK())) {
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
vexprs.clear();
break;
}
if (_vconjunct_ctx_ptr) {
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
}
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
_runtime_filter_ready_flag[i] = true;
}
return Status::OK();
}
Status FileScanNode::start_scanners() {
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
@ -102,6 +210,7 @@ Status FileScanNode::start_scanners() {
}
_scanners_status.resize(_scan_ranges.size());
COUNTER_UPDATE(_num_scanners, _scan_ranges.size());
ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
for (int i = 0; i < _scan_ranges.size(); ++i) {
@ -245,6 +354,18 @@ Status FileScanNode::close(RuntimeState* state) {
}
// Close
_batch_queue.clear();
for (auto& filter_desc : _runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
DCHECK(runtime_filter != nullptr);
runtime_filter->consumer_close();
}
for (auto& ctx : _stale_vexpr_ctxs) {
(*ctx)->close(state);
}
return ExecNode::close(state);
}
@ -266,8 +387,11 @@ Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCount
continue;
}
auto old_rows = block->rows();
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));
{
SCOPED_TIMER(_filter_timer);
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));
}
counter->num_rows_unselected += old_rows - block->rows();
if (block->rows() == 0) {
continue;
@ -318,6 +442,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status
// Update stats
_runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
_runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);
COUNTER_UPDATE(_num_rows_filtered, counter.num_rows_unselected);
// scanner is going to finish
{