[feature](rf) add filter info profile when rf run as expr (#31822)

This commit is contained in:
Mryange
2024-03-11 10:35:56 +08:00
committed by yiguolei
parent 2470634859
commit b0b7161ad0
7 changed files with 67 additions and 26 deletions

View File

@ -444,7 +444,8 @@ public:
}
Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VExprSPtr>& push_exprs, const TExpr& probe_expr);
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
const TExpr& probe_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
bool can_not_merge_in_or_bloom =
@ -1056,14 +1057,23 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo
}
Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VExprSPtr>& push_exprs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
bool is_late_arrival) {
DCHECK(is_consumer());
auto origin_size = push_exprs.size();
if (!_wrapper->is_ignored()) {
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", _format_status());
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT);
auto* always_true_counter = ADD_COUNTER(_profile, "always_true", TUnit::UNIT);
for (auto i = origin_size; i < push_exprs.size(); i++) {
push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter,
always_true_counter);
}
return Status::OK();
}
@ -1715,9 +1725,9 @@ void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer
this->signal();
}
Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VExprSPtr>& container,
const TExpr& probe_expr) {
Status RuntimePredicateWrapper::get_push_exprs(
std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& container, const TExpr& probe_expr) {
vectorized::VExprContextSPtr probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, probe_ctx));
probe_ctxs.push_back(probe_ctx);

View File

@ -233,7 +233,8 @@ public:
PrimitiveType column_type() const;
Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival);
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
bool is_late_arrival);
bool is_broadcast_join() const { return _is_broadcast_join; }

View File

@ -85,7 +85,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
Status RuntimeFilterConsumer::_acquire_runtime_filter() {
SCOPED_TIMER(_acquire_runtime_filter_timer);
VExprSPtrs vexprs;
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
bool ready = runtime_filter->is_ready();
@ -111,12 +111,13 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
return Status::OK();
}
Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
Status RuntimeFilterConsumer::_append_rf_into_conjuncts(
const std::vector<vectorized::VRuntimeFilterPtr>& vexprs) {
if (vexprs.empty()) {
return Status::OK();
}
for (auto& expr : vexprs) {
for (const auto& expr : vexprs) {
VExprContextSPtr conjunct = VExprContext::create_shared(expr);
RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
RETURN_IF_ERROR(conjunct->open(_state));
@ -142,7 +143,7 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
}
// 1. Check if are runtime filter ready but not applied.
VExprSPtrs exprs;
std::vector<vectorized::VRuntimeFilterPtr> exprs;
int current_arrived_rf_num = 0;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
if (_runtime_filter_ctxs[i].apply_mark) {

View File

@ -46,7 +46,7 @@ protected:
// Get all arrived runtime filters at Open phase.
Status _acquire_runtime_filter();
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
Status _append_rf_into_conjuncts(const std::vector<vectorized::VRuntimeFilterPtr>& vexprs);
void _init_profile(RuntimeProfile* profile);
@ -54,9 +54,9 @@ protected:
// For runtime filters
struct RuntimeFilterContext {
RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
RuntimeFilterContext(IRuntimeFilter* rf) : runtime_filter(rf) {}
// set to true if this runtime filter is already applied to vconjunct_ctx_ptr
bool apply_mark;
bool apply_mark = false;
IRuntimeFilter* runtime_filter = nullptr;
};

View File

@ -757,8 +757,8 @@ void NewOlapScanNode::add_filter_info(int id, const PredicateFilterInfo& update_
filter_name += std::to_string(id);
std::string info_str;
info_str += "type = " + type_to_string(static_cast<PredicateType>(info.type)) + ", ";
info_str += "input = " + std::to_string(info.input_row) + ", ";
info_str += "filtered = " + std::to_string(info.filtered_row);
info_str += "predicate input = " + std::to_string(info.input_row) + ", ";
info_str += "predicate filtered = " + std::to_string(info.filtered_row);
info_str = "[" + info_str + "]";
// add info

View File

@ -20,9 +20,12 @@
#include <fmt/format.h>
#include <stddef.h>
#include <cstdint>
#include <memory>
#include <utility>
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
@ -84,7 +87,19 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int*
*result_column_id = num_columns_without_result;
return Status::OK();
} else {
_scan_rows += block->rows();
int64_t input_rows = 0, filter_rows = 0;
Defer statistic_filter_info {[&]() {
if (_expr_filtered_rows_counter) {
COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
}
if (_expr_input_rows_counter) {
COUNTER_UPDATE(_expr_input_rows_counter, input_rows);
}
if (_always_true_counter) {
COUNTER_SET(_always_true_counter, (int64_t)_always_true);
}
}};
input_rows += block->rows();
if (_getting_const_col) {
_impl->set_getting_const_col(true);
@ -99,28 +114,29 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int*
if (is_column_const(*result_column.column)) {
auto* constant_val = const_cast<char*>(result_column.column->get_data_at(0).data);
if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) {
_filtered_rows += block->rows();
filter_rows += block->rows();
}
} else if (const auto* nullable =
check_and_get_column<ColumnNullable>(*result_column.column)) {
data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())
->get_data()
.data();
_filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
nullable->get_null_map_data().data(),
block->rows());
filter_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
nullable->get_null_map_data().data(),
block->rows());
} else if (const auto* res_col =
check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) {
data = const_cast<uint8_t*>(res_col->get_data().data());
_filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
block->rows());
filter_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
block->rows());
} else {
return Status::InternalError(
"Invalid type for runtime filters!, and _expr_name is: {}. _data_type is: {}. "
"result_column_id is: {}. block structure: {}.",
_expr_name, _data_type->get_name(), *result_column_id, block->dump_structure());
}
_filtered_rows += filter_rows;
_scan_rows += input_rows;
calculate_filter(VRuntimeFilterWrapper::EXPECTED_FILTER_RATE, _filtered_rows, _scan_rows,
_has_calculate_filter, _always_true);
return Status::OK();

View File

@ -17,9 +17,8 @@
#pragma once
#include <stdint.h>
#include <atomic>
#include <cstdint>
#include <string>
#include <vector>
@ -27,6 +26,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "udf/udf.h"
#include "util/runtime_profile.h"
#include "vec/exprs/vexpr.h"
namespace doris {
@ -57,6 +57,14 @@ public:
const VExprSPtr get_impl() const override { return _impl; }
void attach_profile_counter(RuntimeProfile::Counter* expr_filtered_rows_counter,
RuntimeProfile::Counter* expr_input_rows_counter,
RuntimeProfile::Counter* always_true_counter) {
_expr_filtered_rows_counter = expr_filtered_rows_counter;
_expr_input_rows_counter = expr_input_rows_counter;
_always_true_counter = always_true_counter;
}
// if filter rate less than this, bloom filter will set always true
constexpr static double EXPECTED_FILTER_RATE = 0.4;
@ -74,12 +82,17 @@ private:
VExprSPtr _impl;
bool _always_true;
/// TODO: statistic filter rate in the profile
std::atomic<int64_t> _filtered_rows;
std::atomic<int64_t> _scan_rows;
RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _expr_input_rows_counter = nullptr;
RuntimeProfile::Counter* _always_true_counter = nullptr;
bool _has_calculate_filter = false;
std::string _expr_name;
};
using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;
} // namespace doris::vectorized