[FEAT MERGE]reorder filter according to real-time statistics and row estimation optimization

Co-authored-by: DengzhiLiu <dengzhiliu@gmail.com>
Co-authored-by: chaser-ch <chaser.ch@antgroup.com>
Co-authored-by: mjhmllover <zdhmjh@163.com>
This commit is contained in:
neicun1024 2024-11-22 11:15:19 +00:00 committed by ob-robot
parent 812ec7c5ec
commit e64abec880
27 changed files with 564 additions and 374 deletions

View File

@ -156,7 +156,8 @@ TEST_F(TestIndexSSTableEstimator, estimate_major_sstable_range)
get_part_est(partial_sstable_, range, ddl_merge_part_est);
STORAGE_LOG(INFO, "part_est", K(part_est), K(ddl_kv_part_est), K(ddl_merge_part_est));
ASSERT_EQ(part_est, ddl_merge_part_est);
ASSERT_EQ(ddl_kv_part_est, ddl_merge_part_est);
ASSERT_TRUE(part_est.logical_row_count_ > ddl_merge_part_est.logical_row_count_);
}
TEST_F(TestIndexSSTableEstimator, estimate_major_sstable_left_range)
@ -204,7 +205,8 @@ TEST_F(TestIndexSSTableEstimator, estimate_major_sstable_middle_range)
get_part_est(partial_sstable_, range, ddl_merge_part_est);
STORAGE_LOG(INFO, "part_est", K(part_est), K(ddl_kv_part_est), K(ddl_merge_part_est));
ASSERT_EQ(part_est, ddl_merge_part_est);
ASSERT_EQ(ddl_kv_part_est, ddl_merge_part_est);
ASSERT_TRUE(part_est.logical_row_count_ > ddl_merge_part_est.logical_row_count_);
}
TEST_F(TestIndexSSTableEstimator, estimate_major_sstable_noexist_range)

View File

@ -35,6 +35,11 @@
"name": "direct_load_allow_fallback",
"value": 1,
"comment": "for simple OLTP workloads, fallback is allowed when direct load fails"
},
{
"name": "_enable_filter_reordering",
"value": false,
"comment": "enable filter reordering in storage engine"
}
]
}
@ -80,6 +85,11 @@
"name": "direct_load_allow_fallback",
"value": 1,
"comment": "for complex OLTP workloads, fallback is allowed when direct load fails"
},
{
"name": "_enable_filter_reordering",
"value": false,
"comment": "enable filter reordering in storage engine"
}
]
}
@ -175,6 +185,11 @@
"name": "log_transport_compress_all",
"value": 1,
"comment": "In scenarios with limited bandwidth, network bandwidth can be saved with a small amount of CPU overhead through RPC compression"
},
{
"name": "_enable_filter_reordering",
"value": false,
"comment": "enable filter reordering in storage engine"
}
]
}

View File

@ -563,6 +563,9 @@ DEF_INT(_storage_meta_memory_limit_percentage, OB_TENANT_PARAMETER, "20", "[0, 5
DEF_INT(_max_tablet_cnt_per_gb, OB_TENANT_PARAMETER, "20000", "[1000, 50000)",
"The maximum number of tablets supported per 1GB of memory by tenant unit. Range: [1000, 50000)",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_filter_reordering, OB_TENANT_PARAMETER, "True",
"enable filter reordering in storage engine",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
//// rootservice config
DEF_TIME(lease_time, OB_CLUSTER_PARAMETER, "10s", "[1s, 5m]",
"Lease for current heartbeat. If the root server does not received any heartbeat "

View File

@ -5520,6 +5520,7 @@ int ObStaticEngineCG::generate_tsc_flags(ObLogTableScan &op, ObTableScanSpec &sp
bool enable_skip_index = false;
bool enable_prefetch_limit = false;
bool enable_column_store = false;
bool enable_filter_reordering = false;
ObBasicSessionInfo *session_info = NULL;
ObLogPlan *log_plan = op.get_plan();
if (OB_ISNULL(log_plan)) {
@ -5574,14 +5575,15 @@ int ObStaticEngineCG::generate_tsc_flags(ObLogTableScan &op, ObTableScanSpec &sp
enable_column_store = op.use_column_store();
ObDASScanCtDef &scan_ctdef = spec.tsc_ctdef_.scan_ctdef_;
ObDASScanCtDef *lookup_ctdef = spec.tsc_ctdef_.lookup_ctdef_;
enable_filter_reordering = tenant_config->_enable_filter_reordering;
scan_ctdef.pd_expr_spec_.pd_storage_flag_.set_flags(pd_blockscan, pd_filter, enable_skip_index,
enable_column_store, enable_prefetch_limit);
enable_column_store, enable_prefetch_limit, enable_filter_reordering);
scan_ctdef.table_scan_opt_.io_read_batch_size_ = io_read_batch_size;
scan_ctdef.table_scan_opt_.io_read_gap_size_ = io_read_gap_size;
scan_ctdef.table_scan_opt_.storage_rowsets_size_ = tenant_config->storage_rowsets_size;
if (nullptr != lookup_ctdef) {
lookup_ctdef->pd_expr_spec_.pd_storage_flag_.set_flags(pd_blockscan, pd_filter, enable_skip_index,
enable_column_store, enable_prefetch_limit);
enable_column_store, enable_prefetch_limit, enable_filter_reordering);
lookup_ctdef->table_scan_opt_.io_read_batch_size_ = io_read_batch_size;
lookup_ctdef->table_scan_opt_.io_read_gap_size_ = io_read_gap_size;
lookup_ctdef->table_scan_opt_.storage_rowsets_size_ = tenant_config->storage_rowsets_size;

View File

@ -1388,7 +1388,7 @@ int ObPushdownFilterExecutor::execute(
} else if (OB_ISNULL(result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null filter bitmap", K(ret));
} else if (nullptr != parent && OB_FAIL(parent->prepare_skip_filter())) {
} else if (nullptr != parent && OB_FAIL(parent->prepare_skip_filter(filter_info.disable_bypass_))) {
LOG_WARN("Failed to check parent blockscan", K(ret));
} else if (is_filter_node()) {
if (OB_FAIL(do_filter(parent, filter_info, micro_scanner, use_vectorize, *result))) {
@ -1501,6 +1501,10 @@ int ObPushdownFilterExecutor::do_filter(
{
int ret = OB_SUCCESS;
bool is_needed_to_do_filter = check_sstable_index_filter();
uint64_t start_time = 0;
if (parent && parent->is_enable_reorder() && filter_info.disable_bypass_) {
start_time = rdtsc();
}
if (!is_needed_to_do_filter) {
} else if (is_filter_dynamic_node()
&& OB_FAIL(static_cast<ObDynamicFilterExecutor *>(this)->check_runtime_filter(
@ -1527,6 +1531,16 @@ int ObPushdownFilterExecutor::do_filter(
0, filter_info.count_, result_bitmap)))) {
LOG_WARN("failed to filter batch", K(ret));
}
if (OB_SUCC(ret) && parent && parent->is_enable_reorder() && filter_info.disable_bypass_) {
uint64_t popcnt = result_bitmap.popcnt();
filter_realtime_statistics_.add_filter_cost_time(rdtsc() - start_time + 1);
if (parent->is_logic_and_node()) { // If parent is logic and, then bitmap is initialized to 1, calculate # of 0 as filtered row count.
filter_realtime_statistics_.add_filtered_row_cnt(result_bitmap.size() - popcnt);
} else if (parent->is_logic_or_node()) { // If parent is logic or, then bitmap is initialized to 0, calculate # of 1 as filtered row count.
filter_realtime_statistics_.add_filtered_row_cnt(popcnt);
}
LOG_DEBUG("collet filter real-time statistics", K(parent->get_type()), K(result_bitmap.size()), K(popcnt), K(filter_realtime_statistics_.get_filtered_row_cnt()), K(filter_realtime_statistics_.get_filter_cost_time()));
}
return ret;
}
@ -1651,7 +1665,8 @@ ObPushdownFilterExecutor::ObPushdownFilterExecutor(common::ObIAllocator &alloc,
: type_(type), need_check_row_filter_(false), filter_tree_status_(ObCommonFilterTreeStatus::NONE_FILTER),
n_cols_(0), n_child_(0), cg_iter_idx_(INVALID_CG_ITER_IDX), skipped_rows_(0), childs_(nullptr),
filter_bitmap_(nullptr), col_params_(alloc), col_offsets_(alloc), cg_col_offsets_(alloc), default_datums_(alloc),
cg_idxs_(alloc), cg_col_exprs_(alloc), allocator_(alloc), op_(op), is_rewrited_(false), filter_bool_mask_()
cg_idxs_(alloc), cg_col_exprs_(alloc), allocator_(alloc), op_(op), is_rewrited_(false), filter_bool_mask_(),
enable_reorder_(false), ref_cnt_(0), filter_realtime_statistics_()
{}
ObPushdownFilterExecutor::~ObPushdownFilterExecutor()
@ -1677,6 +1692,9 @@ ObPushdownFilterExecutor::~ObPushdownFilterExecutor()
cg_iter_idx_ = INVALID_CG_ITER_IDX;
need_check_row_filter_ = false;
is_rewrited_ = false;
enable_reorder_ = false;
ref_cnt_ = 0;
filter_realtime_statistics_.reset();
}
DEF_TO_STRING(ObPushdownFilterExecutor)
@ -1692,13 +1710,15 @@ DEF_TO_STRING(ObPushdownFilterExecutor)
return pos;
}
int ObPushdownFilterExecutor::prepare_skip_filter()
int ObPushdownFilterExecutor::prepare_skip_filter(bool disable_bypass)
{
int ret = OB_SUCCESS;
need_check_row_filter_ = false;
if (OB_ISNULL(filter_bitmap_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null filter bitmap", K(ret));
} else if (enable_reorder_ && disable_bypass) {
need_check_row_filter_ = false;
} else if (PushdownExecutorType::AND_FILTER_EXECUTOR == type_) {
need_check_row_filter_ = !filter_bitmap_->is_all_true();
} else if (PushdownExecutorType::OR_FILTER_EXECUTOR == type_) {
@ -2976,6 +2996,8 @@ void PushdownFilterInfo::reset()
col_capacity_ = 0;
batch_size_ = 0;
col_datum_buf_.reset();
disable_bypass_ = false;
first_batch_ = false;
}
void PushdownFilterInfo::reuse()
@ -2986,6 +3008,8 @@ void PushdownFilterInfo::reuse()
context_ = nullptr;
start_ = -1;
count_ = 0;
disable_bypass_ = false;
first_batch_ = false;
}
int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common::ObIAllocator &alloc)

View File

@ -197,7 +197,7 @@ public:
OB_INLINE void set_enable_prefetch_limiting(const bool enable_limit) { enable_prefetch_limiting_ = enable_limit; }
OB_INLINE void set_use_global_iter_pool(const bool use_iter_mgr) { use_global_iter_pool_ = use_iter_mgr; }
OB_INLINE void set_flags(const bool block_scan, const bool filter, const bool skip_index,
const bool use_cs, const bool enable_limit, const bool filter_reorder = true)
const bool use_cs, const bool enable_limit, const bool filter_reorder = false)
{
set_blockscan_pushdown(block_scan);
set_filter_pushdown(filter);
@ -580,6 +580,32 @@ enum ObCommonFilterTreeStatus : uint8_t
// 即一个是编译器的接口,一个是运行时接口
class ObPushdownFilterExecutor
{
private:
struct FilterRealTimeStatistics {
FilterRealTimeStatistics()
: filter_cost_time_(0)
, filtered_row_cnt_(0)
, skip_index_skip_mb_cnt_(0)
{};
void reset() {
filter_cost_time_ = 0;
filtered_row_cnt_ = 0;
skip_index_skip_mb_cnt_ = 0;
}
OB_INLINE uint64_t get_filter_cost_time() const {return filter_cost_time_;}
OB_INLINE void add_filter_cost_time(uint64_t filter_cost_time) { filter_cost_time_ += filter_cost_time; };
OB_INLINE uint64_t get_filtered_row_cnt() {return filtered_row_cnt_; }
OB_INLINE void add_filtered_row_cnt(uint64_t filtered_row_cnt) { filtered_row_cnt_ += filtered_row_cnt; };
OB_INLINE uint64_t get_skip_index_skip_mb_cnt() { return skip_index_skip_mb_cnt_; };
OB_INLINE void add_skip_index_skip_mb_cnt(uint64_t skip_index_skip_mb_cnt) { skip_index_skip_mb_cnt_ += skip_index_skip_mb_cnt; }
TO_STRING_KV(K_(filter_cost_time), K_(filtered_row_cnt), K_(skip_index_skip_mb_cnt));
uint64_t filter_cost_time_;
uint64_t filtered_row_cnt_;
uint64_t skip_index_skip_mb_cnt_; // # of micro block skipped by skip index.
};
public:
static const int64_t INVALID_CG_ITER_IDX = -1;
public:
@ -601,7 +627,7 @@ public:
virtual OB_INLINE bool is_logic_op_node() const { return is_logic_and_node() || is_logic_or_node(); }
OB_INLINE bool is_filter_dynamic_node() const { return type_ == DYNAMIC_FILTER_EXECUTOR; }
virtual OB_INLINE bool filter_can_continuous_filter() const { return true; }
int prepare_skip_filter();
int prepare_skip_filter(bool disable_bypass);
OB_INLINE bool can_skip_filter(int64_t row) const
{
bool fast_skip = false;
@ -677,6 +703,9 @@ public:
}
OB_INLINE void clear_skipped_rows() { skipped_rows_ = 0; }
OB_INLINE common::ObIAllocator &get_allocator() { return allocator_; }
OB_INLINE bool is_enable_reorder() { return enable_reorder_; }
OB_INLINE void set_enable_reorder(bool enable_reorder) { enable_reorder_ = enable_reorder; }
OB_INLINE FilterRealTimeStatistics &get_filter_realtime_statistics() { return filter_realtime_statistics_; }
inline int get_child(uint32_t nth_child, ObPushdownFilterExecutor *&filter_executor)
{
int ret = common::OB_SUCCESS;
@ -717,6 +746,9 @@ public:
const bool use_vectorize);
int execute_skipping_filter(ObBoolMask &bm);
virtual void clear(); // release array and set memory used by WHITE_OP_IN filter.
void inc_ref() { ++ref_cnt_; }
void dec_ref() { --ref_cnt_; }
int64_t get_ref() { return ref_cnt_; }
DECLARE_VIRTUAL_TO_STRING;
protected:
int find_evaluated_datums(
@ -759,6 +791,9 @@ protected:
private:
bool is_rewrited_;
ObBoolMask filter_bool_mask_;
bool enable_reorder_;
int64_t ref_cnt_;
FilterRealTimeStatistics filter_realtime_statistics_;
};
class ObPhysicalFilterExecutor : public ObPushdownFilterExecutor
@ -1339,7 +1374,9 @@ struct PushdownFilterInfo
col_datum_buf_(),
allocator_(nullptr),
param_(nullptr),
context_(nullptr)
context_(nullptr),
disable_bypass_(false),
first_batch_(false)
{}
~PushdownFilterInfo();
void reset();
@ -1396,6 +1433,8 @@ struct PushdownFilterInfo
common::ObIAllocator *allocator_;
const storage::ObTableIterParam *param_;
storage::ObTableAccessContext *context_;
bool disable_bypass_;
bool first_batch_;
};
}

View File

@ -550,7 +550,6 @@ ob_set_subtarget(ob_storage column_store
column_store/ob_co_sstable_row_scanner.cpp
column_store/ob_co_sstable_row_multi_scanner.cpp
column_store/ob_co_sstable_rows_filter.cpp
column_store/ob_co_where_optimizer.cpp
column_store/ob_cg_bitmap.cpp
column_store/ob_cg_iter_param_pool.cpp
column_store/ob_cg_sstable_row_getter.cpp
@ -614,6 +613,7 @@ ob_set_subtarget(ob_storage access
access/ob_pushdown_aggregate_vec.cpp
access/ob_empty_read_bucket.cpp
access/ob_global_iterator_pool.cpp
access/ob_where_optimizer.cpp
)
ob_set_subtarget(ob_storage ddl

View File

@ -40,7 +40,8 @@ ObBlockRowStore::ObBlockRowStore(ObTableAccessContext &context)
can_blockscan_(false),
filter_applied_(false),
disabled_(false),
is_aggregated_in_prefetch_(false)
is_aggregated_in_prefetch_(false),
where_optimizer_(nullptr)
{}
ObBlockRowStore::~ObBlockRowStore()
@ -56,6 +57,11 @@ void ObBlockRowStore::reset()
disabled_ = false;
is_aggregated_in_prefetch_ = false;
iter_param_ = nullptr;
if (where_optimizer_ != nullptr) {
where_optimizer_->~ObWhereOptimizer();
context_.stmt_allocator_->free(where_optimizer_);
where_optimizer_ = nullptr;
}
}
void ObBlockRowStore::reuse()
@ -81,11 +87,21 @@ int ObBlockRowStore::init(const ObTableAccessParam &param, common::hash::ObHashS
} else if (nullptr != context_.sample_filter_
&& OB_FAIL(context_.sample_filter_->combine_to_filter_tree(pd_filter_info_.filter_))) {
LOG_WARN("Failed to combine sample filter to filter tree", K(ret), K_(pd_filter_info), KP_(context_.sample_filter));
} else {
} else if (nullptr != pd_filter_info_.filter_ && !param.iter_param_.is_use_column_store() && param.iter_param_.enable_pd_filter_reorder()) {
if (OB_UNLIKELY(nullptr != where_optimizer_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected where optimizer", K(ret), KP_(where_optimizer));
} else if (OB_ISNULL(where_optimizer_ = OB_NEWx(ObWhereOptimizer, context_.stmt_allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc memory for ObWhereOptimizer", K(ret));
} else if (OB_FAIL(where_optimizer_->init(&param.iter_param_, pd_filter_info_.filter_))) {
LOG_WARN("Failed to init where optimizer", K(ret), K(param.iter_param_), K(pd_filter_info_.filter_));
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
iter_param_ = &param.iter_param_;
}
if (IS_NOT_INIT) {
} else {
reset();
}
return ret;
@ -109,6 +125,8 @@ int ObBlockRowStore::apply_blockscan(
} else if (nullptr == pd_filter_info_.filter_) {
// nothing to do
filter_applied_ = true;
} else if (nullptr != where_optimizer_ && OB_FAIL(where_optimizer_->reorder_row_filter())){
LOG_WARN("Fail to reorder filter", K(ret), KPC(pd_filter_info_.filter_));
} else if (OB_FAIL(micro_scanner.filter_micro_block_in_blockscan(pd_filter_info_))) {
LOG_WARN("Failed to apply pushdown filter in block reader", K(ret), K(*this));
} else {

View File

@ -16,6 +16,7 @@
#include "common/object/ob_object.h"
#include "lib/container/ob_bitmap.h"
#include "sql/engine/basic/ob_pushdown_filter.h"
#include "storage/access/ob_where_optimizer.h"
namespace oceanbase
{
@ -77,6 +78,8 @@ public:
int get_filter_result(ObFilterResult &res);
OB_INLINE sql::ObPushdownFilterExecutor *get_pd_filter()
{ return pd_filter_info_.filter_; }
OB_INLINE ObWhereOptimizer *get_where_optimizer()
{ return where_optimizer_; }
virtual bool is_end() const { return false; }
virtual bool is_empty() const { return true; }
OB_INLINE bool is_vec2() const { return is_vec2_; } // need to remove after statistical info pushdown support vec 2.0
@ -98,6 +101,7 @@ private:
bool filter_applied_;
bool disabled_;
bool is_aggregated_in_prefetch_;
ObWhereOptimizer *where_optimizer_;
};
}

View File

@ -201,16 +201,17 @@ int ObIndexBlockScanEstimator::cal_total_estimate_result(
if (datum_range.is_whole_range()) {
} else {
const bool is_multi_version_minor = sstable.is_multi_version_minor_sstable();
const bool is_major = sstable.is_major_sstable();
if (!datum_range.get_start_key().is_min_rowkey()) {
if (OB_FAIL(estimate_excluded_border_result(
is_multi_version_minor, datum_range, true, result))) {
is_multi_version_minor, is_major, datum_range, true, result))) {
STORAGE_LOG(WARN, "Failed to estimate left excluded row count", K(ret));
}
}
if (OB_SUCC(ret) && !datum_range.get_end_key().is_max_rowkey()) {
level_ = 0;
if (OB_FAIL(estimate_excluded_border_result(
is_multi_version_minor, datum_range, false, result))) {
is_multi_version_minor, is_major, datum_range, false, result))) {
STORAGE_LOG(WARN, "Failed to estimate right excluded row count", K(ret));
}
}
@ -256,7 +257,8 @@ int ObIndexBlockScanEstimator::cal_total_estimate_result_for_ddl(ObSSTable &ssta
}
int ObIndexBlockScanEstimator::estimate_excluded_border_result(const bool is_multi_version_minor,
const blocksstable::ObDatumRange &datum_range,
const bool is_major,
const blocksstable::ObDatumRange &datum_range,
bool is_left,
ObEstimatedResult &result)
{
@ -322,10 +324,19 @@ int ObIndexBlockScanEstimator::estimate_excluded_border_result(const bool is_mul
}
if (OB_ITER_END == ret && idx > 0) {
if (OB_FAIL(goto_next_level(excluded_range, border_micro_index_info, is_multi_version_minor, result))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "Failed to go to next level", K(ret),
K(border_micro_index_info), K(index_block_row_scanner_));
int64_t ratio = 0;
if (0 == border_micro_index_info.get_row_count()) {
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "Border micro index row count should not be 0", K(ret));
} else if (is_major) {
ratio = (result.total_row_count_ - result.excluded_row_count_) / border_micro_index_info.get_row_count();
}
if (OB_ITER_END == ret && ratio < RANGE_ROWS_IN_AND_BORDER_RATIO_THRESHOLD) {
if (OB_FAIL(goto_next_level(excluded_range, border_micro_index_info, is_multi_version_minor, result))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "Failed to go to next level", K(ret),
K(border_micro_index_info), K(index_block_row_scanner_));
}
}
}
}

View File

@ -107,6 +107,7 @@ private:
const blocksstable::ObDatumRange &datum_range,
ObEstimatedResult &result);
int estimate_excluded_border_result(const bool is_multi_version_minor,
const bool is_major,
const blocksstable::ObDatumRange &datum_range,
const bool is_left,
ObEstimatedResult &result);
@ -132,6 +133,7 @@ private:
return micro_handles_[level_++ % DEFAULT_GET_MICRO_DATA_HANDLE_CNT];
}
static const int64_t DEFAULT_GET_MICRO_DATA_HANDLE_CNT = 2;
static const int64_t RANGE_ROWS_IN_AND_BORDER_RATIO_THRESHOLD = 1000;
uint64_t tenant_id_;
blocksstable::ObMicroBlockData root_index_block_;
blocksstable::ObIndexBlockRowScanner index_block_row_scanner_;

View File

@ -0,0 +1,215 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "ob_where_optimizer.h"
namespace oceanbase
{
namespace storage
{
#define REORDER_FILTER_INTERVAL 32
ObWhereOptimizer::ObWhereOptimizer()
: iter_param_(nullptr)
, filter_(nullptr)
, filter_iters_(nullptr)
, iter_filter_node_(nullptr)
, batch_num_(0)
, reorder_filter_times_(0)
, reorder_filter_interval_(1)
, disable_bypass_(false)
, is_inited_(false)
{
}
int ObWhereOptimizer::init(
const ObTableIterParam *iter_param,
sql::ObPushdownFilterExecutor *filter,
ObSEArray<ObICGIterator*, 4> *filter_iters,
ObSEArray<sql::ObPushdownFilterExecutor*, 4> *iter_filter_node)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObWhereOptimizer init twice", K(ret));
} else if (OB_ISNULL(iter_param_ = iter_param) || OB_ISNULL(filter_ = filter)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("iter param or filter is null", K(ret), K(iter_param_), K(filter_));
} else {
if (filter_iters != nullptr) {
filter_->inc_ref();
}
filter_iters_ = filter_iters;
iter_filter_node_ = iter_filter_node;
filter_conditions_.reset();
batch_num_ = 0;
reorder_filter_times_ = 0;
reorder_filter_interval_ = 1;
disable_bypass_ = false;
judge_filter_whether_enable_reorder(filter);
is_inited_ = true;
}
return ret;
}
void ObWhereOptimizer::reset()
{
if (filter_iters_ != nullptr) {
filter_->dec_ref();
}
iter_param_ = nullptr;
filter_ = nullptr;
filter_iters_ = nullptr;
iter_filter_node_ = nullptr;
filter_conditions_.reset();
batch_num_ = 0;
reorder_filter_times_ = 0;
reorder_filter_interval_ = 1;
disable_bypass_ = false;
is_inited_ = false;
}
int ObWhereOptimizer::analyze(bool &reordered)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ret = analyze_impl(*filter_, reordered);
}
return ret;
}
int ObWhereOptimizer::analyze_impl(sql::ObPushdownFilterExecutor &filter, bool &reordered)
{
int ret = OB_SUCCESS;
sql::ObPushdownFilterExecutor **children = filter.get_childs();
const int child_cnt = filter.get_child_count();
if (filter.is_enable_reorder()) {
if (OB_FAIL(filter_conditions_.prepare_allocate(child_cnt))) {
LOG_WARN("Failed to prepare allocate filter conditions", K(ret), K(child_cnt));
} else {
for (int i = 0; i < child_cnt; ++i) {
filter_conditions_.at(i).idx_ = i;
collect_filter_info(*children[i], filter_conditions_.at(i));
}
lib::ob_sort(&filter_conditions_.at(0), &filter_conditions_.at(0) + child_cnt);
bool need_reorder = false;
for (int i=0; i<child_cnt; ++i) {
if (i != filter_conditions_.at(i).idx_) {
need_reorder = true;
break;
}
}
if (need_reorder) {
int cg_iter_idxs[child_cnt];
for (int i=0; i<child_cnt; ++i) {
cg_iter_idxs[i] = children[i]->get_cg_iter_idx();
}
for (int i=0; i<child_cnt; ++i) {
children[i] = filter_conditions_.at(i).filter_;
children[i]->set_cg_iter_idx(cg_iter_idxs[i]);
if (filter_iters_ != nullptr && iter_filter_node_ != nullptr && cg_iter_idxs[i] != -1) {
(*filter_iters_).at(cg_iter_idxs[i]) = filter_conditions_.at(i).filter_iter_;
(*iter_filter_node_).at(cg_iter_idxs[i]) = filter_conditions_.at(i).filter_node_;
}
}
reordered = true;
}
}
} else if (filter.is_logic_op_node()) {
for (int i = 0; OB_SUCC(ret) && i < child_cnt; ++i) {
if (OB_FAIL(analyze_impl(*children[i], reordered))) {
LOG_WARN("Failed to analyze filter tree", K(ret), K(i), KP(children[i]));
}
}
}
return ret;
}
void ObWhereOptimizer::collect_filter_info(
sql::ObPushdownFilterExecutor &filter,
ObFilterCondition &filter_condition)
{
filter_condition.filter_cost_time_ = filter.get_filter_realtime_statistics().get_filter_cost_time();
filter_condition.filtered_row_cnt_ = filter.get_filter_realtime_statistics().get_filtered_row_cnt();
filter_condition.skip_index_skip_mb_cnt_ = filter.get_filter_realtime_statistics().get_skip_index_skip_mb_cnt();
filter.get_filter_realtime_statistics().reset();
filter_condition.filter_ = &filter;
if (filter_iters_ != nullptr && iter_filter_node_ != nullptr && filter.get_cg_iter_idx() != -1) {
filter_condition.filter_iter_ = (*filter_iters_).at(filter.get_cg_iter_idx());
filter_condition.filter_node_ = (*iter_filter_node_).at(filter.get_cg_iter_idx());
}
}
void ObWhereOptimizer::judge_filter_whether_enable_reorder(sql::ObPushdownFilterExecutor *filter) {
if (filter == nullptr) {
// do nothing
} else if (filter->is_logic_op_node()) {
bool enable_reorder = true;
for (int i=0; i<filter->get_child_count(); ++i) { // enable reorder of this filter if all childs are not logic op nodes
sql::ObPushdownFilterExecutor *child = filter->get_childs()[i];
if (child->is_logic_op_node()) {
enable_reorder = false;
judge_filter_whether_enable_reorder(child);
}
}
filter->set_enable_reorder(enable_reorder);
}
}
int ObWhereOptimizer::reorder_co_filter()
{
int ret = OB_SUCCESS;
bool reordered = false;
++batch_num_;
if (filter_->get_ref() != 1 || !filter_->is_logic_op_node()) {
/* If there is only one node in the filter tree, do nothing. */
} else if (reorder_filter_times_ >= reorder_filter_interval_) {
if (OB_FAIL(analyze(reordered))) { // reordered is used to ajust the reorder interval adaptively in the future.
LOG_WARN("Failed to analyze in co where optimzier", K(ret));
} else {
reorder_filter_times_ = 0;
reorder_filter_interval_ = REORDER_FILTER_INTERVAL;
}
} else {
++reorder_filter_times_;
}
LOG_DEBUG("end of reorder co filter", K(this), K(batch_num_), K(filter_), K(filter_->get_ref()), K(filter_->get_type()), K(reorder_filter_times_), K(reorder_filter_interval_), K(reordered), K(filter_->get_filter_realtime_statistics()));
return ret;
}
int ObWhereOptimizer::reorder_row_filter() {
int ret = OB_SUCCESS;
bool reordered = false;
++batch_num_;
if (!filter_->is_logic_op_node() || iter_param_->is_use_column_store()) {
/* If there is only one node in the filter tree, do nothing. If the column store filter reaches here, do nothing. */
} else if (reorder_filter_times_ >= reorder_filter_interval_) {
if (OB_FAIL(analyze(reordered))) { // reordered is used to ajust the reorder interval adaptively in the future.
LOG_WARN("Failed to analyze in row where optimzier", K(ret));
} else {
reorder_filter_times_ = 0;
reorder_filter_interval_ = REORDER_FILTER_INTERVAL;
}
} else {
++reorder_filter_times_;
}
LOG_DEBUG("end of reorder row filter", K(this), K(batch_num_), K(filter_), K(filter_->get_type()), K(iter_param_->is_use_column_store()), K(reorder_filter_times_), K(reorder_filter_interval_), K(reordered), K(filter_->get_filter_realtime_statistics()));
return ret;
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_STORAGE_COLUMN_STORE_OB_WHERE_OPTIMIZER_H_
#define OB_STORAGE_COLUMN_STORE_OB_WHERE_OPTIMIZER_H_
#include "sql/engine/basic/ob_pushdown_filter.h"
#include "storage/column_store/ob_i_cg_iterator.h"
namespace oceanbase
{
namespace storage
{
class ObWhereOptimizer
{
public:
ObWhereOptimizer();
virtual ~ObWhereOptimizer() { reset(); };
int init(
const ObTableIterParam *iter_param,
sql::ObPushdownFilterExecutor *filter,
ObSEArray<ObICGIterator*, 4> *filter_iters = nullptr,
ObSEArray<sql::ObPushdownFilterExecutor*, 4> *iter_filter_node = nullptr);
void reset();
OB_INLINE bool is_disable_bypass() // Disable bypass in this batch and collect real-time statistics, then reorder filter before next batch.
{ return reorder_filter_times_ == reorder_filter_interval_; }
OB_INLINE bool is_first_batch()
{ return batch_num_ == 1; }
int reorder_co_filter();
int reorder_row_filter();
private:
struct ObFilterCondition
{
uint64_t idx_;
uint64_t filter_cost_time_;
uint64_t filtered_row_cnt_;
uint64_t skip_index_skip_mb_cnt_;
sql::ObPushdownFilterExecutor *filter_;
ObICGIterator *filter_iter_;
sql::ObPushdownFilterExecutor *filter_node_;
bool operator< (const ObFilterCondition &filter_condition) const {
bool ret = false;
if (skip_index_skip_mb_cnt_ != filter_condition.skip_index_skip_mb_cnt_) {
ret = skip_index_skip_mb_cnt_ > filter_condition.skip_index_skip_mb_cnt_;
} else if (filter_cost_time_ == 0 || filter_condition.filter_cost_time_ == 0) {
ret = !(filter_cost_time_ == 0);
} else {
float rank1 = - 1.0 * filtered_row_cnt_ / filter_cost_time_;
float rank2 = - 1.0 * filter_condition.filtered_row_cnt_ / filter_condition.filter_cost_time_;
ret = rank1 < rank2;
}
return ret;
}
TO_STRING_KV(K_(idx), K_(filter_cost_time), K_(filtered_row_cnt), K_(skip_index_skip_mb_cnt));
};
int analyze(bool &reordered);
int analyze_impl(sql::ObPushdownFilterExecutor &filter, bool &reordered);
void collect_filter_info(sql::ObPushdownFilterExecutor &filter, ObFilterCondition &filter_condition);
void judge_filter_whether_enable_reorder(sql::ObPushdownFilterExecutor *filter);
private:
const ObTableIterParam *iter_param_;
sql::ObPushdownFilterExecutor *filter_;
ObSEArray<ObICGIterator*, 4> *filter_iters_;
ObSEArray<sql::ObPushdownFilterExecutor*, 4> *iter_filter_node_;
ObSEArray<ObFilterCondition, 4> filter_conditions_;
uint32_t batch_num_;
uint32_t reorder_filter_times_;
uint32_t reorder_filter_interval_;
bool disable_bypass_;
bool is_inited_;
};
}
}
#endif

View File

@ -612,6 +612,33 @@ int ObMicroBlockReader::get_row_header(
return ret;
}
int ObMicroBlockReader::get_logical_row_cnt(
const int64_t last,
int64_t &row_idx,
int64_t &row_cnt) const
{
int ret = OB_SUCCESS;
const ObRowHeader *row_header = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("reader not init", K(ret));
} else if (OB_UNLIKELY(nullptr == header_ || last >= header_->row_count_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(row_idx), K(last), KPC_(header));
} else {
while (OB_SUCC(ret) && row_idx <= last) {
if (OB_ISNULL(row_header = reinterpret_cast<const ObRowHeader*>(data_begin_ + index_data_[row_idx]))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("row_header is NULL", K(ret), K(row_idx), KP(data_begin_), KP(index_data_));
} else if (row_header->get_row_multi_version_flag().is_first_multi_version_row()) {
row_cnt += row_header->get_row_flag().get_delta();
}
row_idx++;
}
}
return ret;
}
int ObMicroBlockReader::get_row_count(int64_t &row_count)
{
int ret = OB_SUCCESS;

View File

@ -78,6 +78,10 @@ public:
virtual int get_row_header(
const int64_t row_idx,
const ObRowHeader *&row_header) override;
int get_logical_row_cnt(
const int64_t last,
int64_t &row_idx,
int64_t &row_cnt) const;
virtual int get_row_count(int64_t &row_count) override;
int get_multi_version_info(
const int64_t row_idx,
@ -152,6 +156,7 @@ public:
const int64_t begin_idx,
int64_t &row_idx) override;
OB_INLINE bool single_version_rows() { return nullptr != header_ && header_->single_version_rows_; }
OB_INLINE bool committed_single_version_rows() { return single_version_rows() && !header_->contain_uncommitted_rows(); }
// For column store
virtual int find_bound(

View File

@ -104,7 +104,8 @@ int ObIMicroBlockRowScanner::init(
LOG_WARN("null columns info", K(ret), K(param), K(context.use_fuse_row_cache_), KPC_(read_info));
} else if (OB_FAIL(row_.init(allocator_, param.get_buffered_out_col_cnt()))) {
STORAGE_LOG(WARN, "Failed to init datum row", K(ret));
} else {
}
if (OB_SUCC(ret)) {
param_ = &param;
context_ = &context;
sstable_ = sstable;
@ -314,24 +315,6 @@ int ObIMicroBlockRowScanner::inner_get_next_row(const ObDatumRow *&row)
return ret;
}
int ObIMicroBlockRowScanner::inner_get_row_header(const ObRowHeader *&row_header)
{
int ret = OB_SUCCESS;
row_header = nullptr;
if (OB_FAIL(end_of_block())) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to judge end of block or not", K(ret));
}
} else {
if (OB_FAIL(reader_->get_row_header(current_, row_header))) {
LOG_WARN("micro block reader fail to get row.", K(ret), K_(macro_id));
} else {
current_ += step_;
}
}
return ret;
}
int ObIMicroBlockRowScanner::inner_get_next_row_blockscan(const ObDatumRow *&row)
{
int ret = OB_SUCCESS;
@ -856,6 +839,7 @@ int ObIMicroBlockRowScanner::filter_micro_block_in_blockscan(sql::PushdownFilter
pd_filter_info.is_pd_to_cg_ = false;
pd_filter_info.param_ = param_;
pd_filter_info.context_ = context_;
pd_filter_info.disable_bypass_ = block_row_store_->get_where_optimizer() != nullptr ? block_row_store_->get_where_optimizer()->is_disable_bypass() : false;
if (pd_filter_info.filter_->is_filter_constant()) {
common::ObBitmap *result = nullptr;
if (OB_FAIL(pd_filter_info.filter_->init_bitmap(pd_filter_info.count_, result))) {
@ -1168,16 +1152,11 @@ int ObMicroBlockRowScanner::estimate_row_count(
}
est.logical_row_count_ = est.physical_row_count_;
if (est.physical_row_count_ > 0 && consider_multi_version) {
const ObRowHeader *row_header;
ObMicroBlockReader *reader = static_cast<ObMicroBlockReader *>(reader_);
if (est.physical_row_count_ > 0 && consider_multi_version && !reader->committed_single_version_rows()) {
est.logical_row_count_ = 0;
while (OB_SUCC(inner_get_row_header(row_header))) {
if (row_header->get_row_multi_version_flag().is_first_multi_version_row()) {
est.logical_row_count_ += row_header->get_row_flag().get_delta();
}
}
if (OB_ITER_END == ret || OB_BEYOND_THE_RANGE == ret) {
ret = OB_SUCCESS;
if (OB_FAIL(reader->get_logical_row_cnt(last_, current_, est.logical_row_count_))) {
LOG_WARN("failed to get logical row count", K(ret));
}
}
}

View File

@ -156,7 +156,6 @@ public:
VIRTUAL_TO_STRING_KV(K_(can_ignore_multi_version));
protected:
virtual int inner_get_next_row(const ObDatumRow *&row);
int inner_get_row_header(const ObRowHeader *&row_header);
int set_reader(const ObRowStoreType store_type);
int set_base_scan_param(const bool is_left_bound_block,
const bool is_right_bound_block);

View File

@ -229,10 +229,10 @@ public:
max_filter_constant_id_ = is_reverse_scan_ ? start_row_id_ : start_row_id_ + bitmap_.size() - 1;
}
OB_INLINE int set_bitmap_batch(ObCSRowId start, ObCSRowId end, const bool value)
OB_INLINE int set_bitmap_batch(ObCSRowId start, ObCSRowId end, const bool value, int64_t &count)
{
int64_t offset = MAX(start - start_row_id_, 0);
int64_t count = MIN(end - start + 1, bitmap_.size() - offset);
count = MIN(end - start + 1, bitmap_.size() - offset);
filter_constant_type_.set_uncertain();
return bitmap_.set_bitmap_batch(offset, count, value);
}

View File

@ -288,7 +288,7 @@ int ObCGScanner::apply_filter(
(nullptr != parent && nullptr == parent_bitmap))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(ret), K(row_count), K(result_bitmap.size()), KP(filter_info.filter_),
KP(parent), KP(parent_bitmap));
KP(parent), KP(parent_bitmap), K(prefetcher_.sstable_index_filter_));
} else if (end_of_scan() || row_count_left() < row_count) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected scanner", K(ret), K(row_count), KPC(this));
@ -324,6 +324,7 @@ int ObCGScanner::apply_filter(
}
int ObCGScanner::get_next_valid_block(sql::ObPushdownFilterExecutor *parent,
sql::PushdownFilterInfo &filter_info,
const ObCGBitmap *parent_bitmap,
ObCGBitmap &result_bitmap)
{
@ -338,6 +339,7 @@ int ObCGScanner::get_next_valid_block(sql::ObPushdownFilterExecutor *parent,
result_bitmap.set_filter_uncertain();
ObCSRowId prefetch_constant_id = prefetcher_.get_max_filter_constant_id();
sql::ObBoolMask prefetch_constant_type = prefetcher_.get_filter_constant_type();
int64_t count;
// reset bitmap when bits not consistent with filter constant type after set by prefetcher
// filter info
// bitmap set value could be decided by the logic op type of filter root in the iter param
@ -349,14 +351,16 @@ int ObCGScanner::get_next_valid_block(sql::ObPushdownFilterExecutor *parent,
if (OB_FAIL(result_bitmap.set_bitmap_batch(
MAX(query_index_range_.start_row_id_, prefetch_constant_id),
query_index_range_.end_row_id_,
prefetch_constant_type.is_always_true()))) {
prefetch_constant_type.is_always_true(),
count))) {
LOG_WARN("Fail to set bitmap batch", K(ret), K_(query_index_range), K(prefetch_constant_id));
}
} else {
if (OB_FAIL(result_bitmap.set_bitmap_batch(
query_index_range_.start_row_id_,
MIN(query_index_range_.end_row_id_, prefetch_constant_id),
prefetch_constant_type.is_always_true()))) {
prefetch_constant_type.is_always_true(),
count))) {
LOG_WARN("Fail to set bitmap batch", K(ret), K_(query_index_range), K(prefetch_constant_id));
}
}
@ -370,21 +374,30 @@ int ObCGScanner::get_next_valid_block(sql::ObPushdownFilterExecutor *parent,
++prefetcher_.cur_micro_data_read_idx_;
const ObMicroIndexInfo &index_info = prefetcher_.current_micro_info();
const ObCSRange &row_range = index_info.get_row_range();
int64_t count;
if (index_info.is_filter_always_false()) {
if (OB_FAIL(result_bitmap.set_bitmap_batch(
MAX(query_index_range_.start_row_id_, row_range.start_row_id_),
MIN(query_index_range_.end_row_id_, row_range.end_row_id_),
false))) {
MAX(query_index_range_.start_row_id_, row_range.start_row_id_),
MIN(query_index_range_.end_row_id_, row_range.end_row_id_),
false,
count))) {
LOG_WARN("Fail to set bitmap batch", K(ret), K(row_range));
} else if (parent && parent->is_enable_reorder() && filter_info.disable_bypass_) {
filter_info.filter_->get_filter_realtime_statistics().add_filtered_row_cnt(count);
filter_info.filter_->get_filter_realtime_statistics().add_skip_index_skip_mb_cnt(1);
}
} else if (index_info.is_filter_always_true()) {
if (OB_FAIL(result_bitmap.set_bitmap_batch(
MAX(query_index_range_.start_row_id_, row_range.start_row_id_),
MIN(query_index_range_.end_row_id_, row_range.end_row_id_),
true))) {
MAX(query_index_range_.start_row_id_, row_range.start_row_id_),
MIN(query_index_range_.end_row_id_, row_range.end_row_id_),
true,
count))) {
LOG_WARN("Fail to set bitmap batch", K(ret), K(row_range));
} else if (parent && parent->is_enable_reorder() && filter_info.disable_bypass_) {
filter_info.filter_->get_filter_realtime_statistics().add_filtered_row_cnt(count);
filter_info.filter_->get_filter_realtime_statistics().add_skip_index_skip_mb_cnt(1);
}
} else if (nullptr != parent && ObCGScanner::can_skip_filter(
} else if (nullptr != parent && (!parent->is_enable_reorder() || !filter_info.first_batch_) && ObCGScanner::can_skip_filter(
*parent, *parent_bitmap, prefetcher_.current_micro_info().get_row_range())) {
continue;
} else {
@ -426,7 +439,7 @@ int ObCGScanner::inner_filter(
LOG_DEBUG("Set constant filter info", K(ret), K(prefetch_constant_type), K(prefetch_constant_id));
} else {
if (is_new_range_ || OB_ITER_END == micro_scanner_->end_of_block()) {
if (OB_FAIL(get_next_valid_block(parent, parent_bitmap, result_bitmap))) {
if (OB_FAIL(get_next_valid_block(parent, filter_info, parent_bitmap, result_bitmap))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("Fail to get next valid index", K(ret));
}

View File

@ -70,6 +70,7 @@ public:
const ObCGBitmap &parent_bitmap,
const ObCSRange &row_range);
int get_next_valid_block(sql::ObPushdownFilterExecutor *parent,
sql::PushdownFilterInfo &filter_info,
const ObCGBitmap *parent_bitmap,
ObCGBitmap &result_bitmap);
int build_index_filter(sql::ObPushdownFilterExecutor &filter);

View File

@ -604,7 +604,7 @@ int ObCOSSTableRowScanner::filter_rows(BlockScanState &blockscan_state)
{
int ret = OB_SUCCESS;
LOG_TRACE("[COLUMNSTORE] COScanner filter_rows [start]", K(ret), K_(state), K_(blockscan_state),
K_(current), K_(group_size), K_(end));
K_(current), K_(group_size), K_(end), K(this), K(access_ctx_->limit_param_));
if (iter_param_->has_lob_column_out()) {
access_ctx_->reuse_lob_locator_helper();
}
@ -630,7 +630,7 @@ int ObCOSSTableRowScanner::filter_rows_with_limit(BlockScanState &blockscan_stat
{
int ret = OB_SUCCESS;
LOG_DEBUG("COScanner filter_rows_with_limit begin", K(ret), K_(state), K_(blockscan_state),
K_(current), K_(group_size), K_(end));
K_(current), K_(group_size), K_(end), K(this));
while (OB_SUCC(ret) && !is_limit_end_) {
ObCSRowId begin = current_;
const ObCGBitmap* result_bitmap = nullptr;
@ -670,8 +670,8 @@ int ObCOSSTableRowScanner::filter_rows_without_limit(BlockScanState &blockscan_s
ObCSRowId current_start_row_id = current_;
ObCSRowId continuous_end_row_id = OB_INVALID_CS_ROW_ID;
const ObCGBitmap* result_bitmap = nullptr;
LOG_DEBUG("COScanner filter_rows_with_limit begin", K(ret), K_(state), K_(blockscan_state),
K_(current), K_(group_size), K_(end));
LOG_DEBUG("COScanner filter_rows_without_limit begin", K(ret), K_(state), K_(blockscan_state),
K_(current), K_(group_size), K_(end), K(this));
while (OB_SUCC(ret) && need_do_filter) {
int64_t current_group_size = 0;
if (OB_INVALID_CS_ROW_ID != pending_end_row_id_) {
@ -1228,4 +1228,4 @@ int ObCOSSTableRowScanner::push_group_by_processor(ObICGIterator *cg_iterator)
}
}
}
}

View File

@ -14,7 +14,7 @@
#include "ob_cg_tile_scanner.h"
#include "ob_co_sstable_rows_filter.h"
#include "ob_column_oriented_sstable.h"
#include "ob_co_where_optimizer.h"
#include "storage/access/ob_where_optimizer.h"
#include "storage/access/ob_block_row_store.h"
#include "storage/access/ob_table_access_context.h"
#include "common/ob_smart_call.h"
@ -34,12 +34,13 @@ ObCOSSTableRowsFilter::ObCOSSTableRowsFilter()
access_ctx_(nullptr),
co_sstable_(nullptr),
allocator_(nullptr),
bitmap_buffer_(),
pd_filter_info_(),
can_continuous_filter_(true),
filter_(nullptr),
filter_iters_(),
iter_filter_node_(),
bitmap_buffer_(),
pd_filter_info_(),
can_continuous_filter_(true)
where_optimizer_(nullptr)
{
}
@ -82,10 +83,21 @@ int ObCOSSTableRowsFilter::init(
LOG_WARN("Failed to init bitmap buffer", K(ret), K(depth));
} else if (OB_FAIL(filter_tree_can_continuous_filter(filter_, can_continuous_filter_))) {
LOG_WARN("failed to filter_tree_can_continuous_filter", K(ret));
} else {
is_inited_ = true;
} else if (nullptr != param.pushdown_filter_ && param.enable_pd_filter_reorder()) {
if (OB_UNLIKELY(nullptr != where_optimizer_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected where optimizer", K(ret), KP_(where_optimizer));
} else if (OB_ISNULL(where_optimizer_ = OB_NEWx(ObWhereOptimizer, allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc memory for ObWhereOptimizer", K(ret));
} else if (OB_FAIL(where_optimizer_->init(iter_param_, filter_, &filter_iters_, &iter_filter_node_))) {
LOG_WARN("Failed to init where optimizer", K(ret), K(iter_param_), KPC_(filter));
}
}
}
if OB_SUCC(ret) {
is_inited_ = true;
}
return ret;
}
@ -101,10 +113,7 @@ int ObCOSSTableRowsFilter::rewrite_filter(uint32_t &depth)
// or retry scanning in DAS.
// TODO: reorder pushdown filter by filter ratio, io cost, runtime filter(runtime filter
// should keep last), etc.
ObCOWhereOptimizer where_optimizer(*co_sstable_, *filter_);
if (iter_param_->enable_pd_filter_reorder() && OB_FAIL(where_optimizer.analyze())) {
LOG_WARN("Failed to analyze in where optimzier", K(ret));
} else if (OB_FAIL(judge_whether_use_common_cg_iter(filter_))) {
if (OB_FAIL(judge_whether_use_common_cg_iter(filter_))) {
LOG_WARN("Failed to judge where use common column group iterator", K(ret), KPC_(filter));
}
}
@ -160,6 +169,14 @@ int ObCOSSTableRowsFilter::switch_context(
LOG_WARN("Failed to construct skip filter", K(ret), KPC(filter));
}
}
if (OB_SUCC(ret) && nullptr != param.pushdown_filter_ && param.enable_pd_filter_reorder()) {
if (nullptr == where_optimizer_ && OB_ISNULL(where_optimizer_ = OB_NEWx(ObWhereOptimizer, allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc memory for ObWhereOptimizer", K(ret));
} else if (OB_FAIL(where_optimizer_->init(iter_param_, param.pushdown_filter_, &filter_iters_, &iter_filter_node_))) {
LOG_WARN("Failed to init where optimizer", K(ret), K(iter_param_), KPC_(filter));
}
}
}
return ret;
}
@ -176,6 +193,11 @@ void ObCOSSTableRowsFilter::reset()
co_sstable_ = nullptr;
clear_filter_state(filter_);
filter_ = nullptr;
if (where_optimizer_ != nullptr) {
where_optimizer_->~ObWhereOptimizer();
allocator_->free(where_optimizer_);
where_optimizer_ = nullptr;
}
for (int64_t i = 0; i < filter_iters_.count(); ++i) {
ObICGIterator* cg_iter = filter_iters_[i];
cg_iter->~ObICGIterator();
@ -195,6 +217,9 @@ void ObCOSSTableRowsFilter::reset()
void ObCOSSTableRowsFilter::reuse()
{
if (where_optimizer_ != nullptr) {
where_optimizer_->reset();
}
for (int64_t i = 0; i < filter_iters_.count(); ++i) {
filter_iters_[i]->reuse();
}
@ -240,6 +265,8 @@ int ObCOSSTableRowsFilter::prepare_apply(const ObCSRange &range)
} else if (OB_UNLIKELY(!range.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid argument", K(range));
} else if (nullptr != where_optimizer_ && OB_FAIL(where_optimizer_->reorder_co_filter())) {
LOG_WARN("Fail to reorder co filter", K(ret));
} else if (FALSE_IT(subtree_filter_iter_to_locate_ = 0)) {
} else if (FALSE_IT(subtree_filter_iter_to_filter_ = 0)) {
} else if (OB_FAIL(try_locating_cg_iter(0, range))) {
@ -284,6 +311,8 @@ int ObCOSSTableRowsFilter::apply_filter(
// to avoid skip filter because of pruning when apply_filter.
pd_filter_info_.reuse();
pd_filter_info_.filter_ = filter;
pd_filter_info_.disable_bypass_ = where_optimizer_ != nullptr ? where_optimizer_->is_disable_bypass() : false;
pd_filter_info_.first_batch_ = where_optimizer_ != nullptr ? where_optimizer_->is_first_batch() : false;
if (OB_FAIL(try_locating_cg_iter(iter_idx, range))) {
LOG_WARN("Failed to locate", K(ret), K(range), K(iter_idx));
} else if (OB_UNLIKELY(iter_idx >= subtree_filter_iter_to_locate_)) {
@ -322,7 +351,7 @@ int ObCOSSTableRowsFilter::apply_filter(
is_skip))) {
LOG_WARN("Failed to post apply filter", K(ret), KP(result),
KP(child_result));
} else if (!is_skip) {
} else if ((filter->is_enable_reorder() && where_optimizer_ != nullptr && where_optimizer_->is_disable_bypass()) || !is_skip) {
if (OB_FAIL(try_locating_cg_iter(subtree_filter_iter_to_filter_, range))) {
LOG_WARN("Failed to locate", K(ret), K(range), K_(subtree_filter_iter_to_filter));
}

View File

@ -18,6 +18,7 @@
#include "ob_i_cg_iterator.h"
#include "ob_cg_iter_param_pool.h"
#include "ob_cg_bitmap.h"
#include "storage/access/ob_where_optimizer.h"
namespace oceanbase
{
@ -137,12 +138,14 @@ private:
ObTableAccessContext* access_ctx_;
ObCOSSTableV2* co_sstable_;
common::ObIAllocator *allocator_;
sql::ObPushdownFilterExecutor *filter_;
ObSEArray<ObICGIterator*, 4> filter_iters_;
ObSEArray<sql::ObPushdownFilterExecutor*, 4> iter_filter_node_;
ObSEArray<ObCGBitmap*, 4> bitmap_buffer_;
sql::PushdownFilterInfo pd_filter_info_;
bool can_continuous_filter_;
public:
sql::ObPushdownFilterExecutor *filter_;
ObSEArray<ObICGIterator*, 4> filter_iters_;
ObSEArray<sql::ObPushdownFilterExecutor*, 4> iter_filter_node_;
ObWhereOptimizer *where_optimizer_;
};
}
}

View File

@ -1,211 +0,0 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "ob_co_where_optimizer.h"
#include "ob_column_oriented_sstable.h"
#include "ob_column_store_util.h"
#include "sql/engine/basic/ob_pushdown_filter.h"
namespace oceanbase
{
namespace storage
{
ObCOWhereOptimizer::ObCOWhereOptimizer(
ObCOSSTableV2 &co_sstable,
sql::ObPushdownFilterExecutor &filter)
: co_sstable_(co_sstable)
, filter_(filter)
, filter_conditions_()
{
}
int ObCOWhereOptimizer::analyze()
{
return analyze_impl(filter_);
}
int ObCOWhereOptimizer::analyze_impl(sql::ObPushdownFilterExecutor &filter)
{
int ret = OB_SUCCESS;
sql::ObPushdownFilterExecutor **children = filter.get_childs();
const uint32_t child_cnt = filter.get_child_count();
if (filter.is_logic_op_node()) {
for (uint32_t i = 0; OB_SUCC(ret) && i < child_cnt; ++i) {
if (OB_FAIL(analyze_impl(*children[i]))) {
LOG_WARN("Failed to analyze filter tree", K(ret), K(i), KP(children[i]));
}
}
}
if (OB_SUCC(ret) && filter.is_logic_and_node()) {
bool reorder = true;
for (uint32_t i = 0; reorder && i < child_cnt; ++i) {
sql::ObPushdownFilterExecutor &child_filter = *children[i];
// if (child_filter.is_filter_dynamic_node()) {
// reorder = false;
// break;
// }
if (!child_filter.is_filter_node()) {
reorder = false;
break;
}
}
if (reorder) {
sql::ObPushdownFilterExecutor *best_filter = nullptr;
if (OB_FAIL(filter_conditions_.prepare_allocate(child_cnt))) {
LOG_WARN("Failed to prepare allocate filter conditions", K(ret), K(child_cnt));
} else {
for (uint32_t i = 0; OB_SUCC(ret) && i < child_cnt; ++i) {
sql::ObPushdownFilterExecutor &child_filter = *children[i];
ObFilterCondition &filter_condition = filter_conditions_[i];
filter_condition.idx_ = i;
if (OB_FAIL(collect_filter_info(child_filter, filter_condition))) {
LOG_WARN("Failed to collect filter into", K(ret));
}
}
if (OB_SUCC(ret)) {
lib::ob_sort(&filter_conditions_[0], &filter_conditions_[0] + child_cnt);
const uint64_t best_filter_idx = filter_conditions_[0].idx_;
best_filter = children[best_filter_idx];
if (0 == best_filter_idx ||
!can_choose_best_filter(&filter_conditions_[0], *best_filter, filter)) {
} else {
for (uint32_t i = best_filter_idx; i > 0; i--) {
children[i] = children[i - 1];
}
children[0] = best_filter;
}
}
}
}
}
return ret;
}
int ObCOWhereOptimizer::collect_filter_info(
sql::ObPushdownFilterExecutor &filter,
ObFilterCondition &filter_condition)
{
int ret = OB_SUCCESS;
const common::ObIArray<uint32_t> &cg_idxes = filter.get_cg_idxs();
const int64_t column_cnt = cg_idxes.count();
filter_condition.columns_cnt_ = column_cnt;
const ObCOSSTableMeta &cs_meta = co_sstable_.get_cs_meta();
uint64_t columns_size = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < column_cnt; ++i)
{
const uint32_t cg_idx = cg_idxes.at(i);
ObSSTableWrapper cg_table_wrapper;
if (is_virtual_cg(cg_idx) || cg_idx >= cs_meta.column_group_cnt_) {
} else if (OB_FAIL(co_sstable_.fetch_cg_sstable(cg_idx, cg_table_wrapper))) {
LOG_WARN("Failed to fetch cg sstable", K(ret));
} else {
columns_size += cg_table_wrapper.get_sstable()->get_total_macro_block_count();
}
}
if (OB_SUCC(ret)) {
filter_condition.columns_size_ = columns_size;
if (0 == columns_size) {
filter_condition.columns_cnt_ = 0;
}
filter_condition.execution_cost_ = estimate_execution_cost(filter);
}
return ret;
}
uint64_t ObCOWhereOptimizer::estimate_execution_cost(sql::ObPushdownFilterExecutor &filter)
{
using namespace sql;
uint64_t execution_cost;
if (!filter.is_filter_white_node()) {
execution_cost = UINT64_MAX;
} else if (filter.is_filter_dynamic_node()) {
ObDynamicFilterExecutor &dynamic_filter = static_cast<ObDynamicFilterExecutor &>(filter);
if (dynamic_filter.get_filter_node().get_dynamic_filter_type()
== DynamicFilterType::PD_TOPN_FILTER) {
execution_cost = 1;
} else {
execution_cost = 1;
}
} else {
ObWhiteFilterExecutor &white_filter = static_cast<ObWhiteFilterExecutor &>(filter);
switch (white_filter.get_op_type()) {
case WHITE_OP_EQ:
case WHITE_OP_LE:
case WHITE_OP_LT:
case WHITE_OP_GE:
case WHITE_OP_GT:
case WHITE_OP_NE:
if (!is_lob_storage(white_filter.get_filter_node().column_exprs_.at(0)->obj_meta_.get_type())
&& white_filter.get_datums().at(0).len_ <= sizeof(uint64_t)) {
execution_cost = 1;
} else {
execution_cost = UINT64_MAX;
}
break;
case WHITE_OP_NU:
case WHITE_OP_NN:
execution_cost = 1;
break;
default:
execution_cost = UINT64_MAX;
break;
}
}
return execution_cost;
}
bool ObCOWhereOptimizer::can_choose_best_filter(
ObFilterCondition *best_filter_condition,
sql::ObPushdownFilterExecutor &best_filter,
sql::ObPushdownFilterExecutor &parent_filter)
{
bool ret = true;
if (OB_UNLIKELY(!best_filter.is_cg_param_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected filter cg param", K(ret), K(best_filter));
} else {
sql::ObPushdownFilterExecutor **children = parent_filter.get_childs();
const uint32_t child_cnt = parent_filter.get_child_count();
const uint32_t best_cg_idx = best_filter.get_cg_idxs().at(0);
ObFilterCondition *second_best_filter_condition = best_filter_condition + 1;
if (best_filter_condition->execution_cost_ < second_best_filter_condition->execution_cost_
|| best_filter_condition->columns_size_ * 2 < second_best_filter_condition->columns_size_
|| best_filter_condition->columns_cnt_ * 2 < second_best_filter_condition->columns_cnt_) {
for (uint32_t i = 1; i < child_cnt; ++i) {
sql::ObPushdownFilterExecutor *filter = children[best_filter_condition[i].idx_];
const common::ObIArray<uint32_t> &cg_idxes = filter->get_cg_idxs();
if (is_contain(cg_idxes, best_cg_idx)) {
ret = false;
break;
}
}
} else {
ret = false;
}
}
return ret;
}
}
}

View File

@ -1,79 +0,0 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_STORAGE_COLUMN_STORE_OB_CO_WHERE_OPTIMIZER_H_
#define OB_STORAGE_COLUMN_STORE_OB_CO_WHERE_OPTIMIZER_H_
#include "sql/engine/basic/ob_pushdown_filter.h"
namespace oceanbase
{
namespace storage
{
class ObCOSSTableV2;
class ObCOWhereOptimizer
{
public:
ObCOWhereOptimizer(
ObCOSSTableV2 &co_sstable,
sql::ObPushdownFilterExecutor &filter);
~ObCOWhereOptimizer() = default;
int analyze();
private:
struct ObFilterCondition
{
uint64_t idx_;
uint64_t columns_size_;
uint64_t columns_cnt_;
uint64_t execution_cost_;
bool operator< (const ObFilterCondition &filter_condition) const {
bool ret = false;
if (this->execution_cost_ < filter_condition.execution_cost_) {
ret = true;
} else if (this->execution_cost_ > filter_condition.execution_cost_) {
ret = false;
} else if (this->columns_size_ < filter_condition.columns_size_) {
ret = true;
} else if (this->columns_size_ > filter_condition.columns_size_) {
ret = false;
} else {
ret = this->columns_cnt_ < filter_condition.columns_cnt_;
}
return ret;
}
TO_STRING_KV(K_(idx), K_(columns_size), K_(columns_cnt), K_(execution_cost));
};
using ObFilterConditions = common::ObSEArray<ObFilterCondition, 4>;
int analyze_impl(sql::ObPushdownFilterExecutor &filter);
int collect_filter_info(
sql::ObPushdownFilterExecutor &filter,
ObFilterCondition &filter_condition);
uint64_t estimate_execution_cost(sql::ObPushdownFilterExecutor &filter);
bool can_choose_best_filter(
ObFilterCondition *best_filter_condition,
sql::ObPushdownFilterExecutor &best_filter,
sql::ObPushdownFilterExecutor &parent_filter);
private:
ObCOSSTableV2 &co_sstable_;
sql::ObPushdownFilterExecutor &filter_;
ObFilterConditions filter_conditions_;
};
}
}
#endif

View File

@ -342,6 +342,7 @@ _enable_defensive_check
_enable_easy_keepalive
_enable_enhanced_cursor_validation
_enable_enum_set_subschema
_enable_filter_reordering
_enable_hash_join_hasher
_enable_hash_join_processor
_enable_hgby_llc_ndv_adaptive

View File

@ -96,7 +96,8 @@ TEST_F(TestCGBitmap, set_batch)
ASSERT_EQ(true, bitmap.is_all_true());
ObCSRange cs_range(0, 200);
bitmap.set_bitmap_batch(cs_range.start_row_id_, cs_range.end_row_id_, 0);
int64_t count;
bitmap.set_bitmap_batch(cs_range.start_row_id_, cs_range.end_row_id_, 0, count);
ASSERT_EQ(false, bitmap.is_all_true());
ASSERT_EQ(true, bitmap.get_filter_constant_type().is_uncertain());
ASSERT_EQ(true, bitmap.is_all_false(cs_range));