disable invalid filter pushdown to storage layer

This commit is contained in:
obdev 2024-09-18 09:32:14 +00:00 committed by ob-robot
parent 35e7cb158a
commit f062d160be
5 changed files with 45 additions and 119 deletions

View File

@ -963,57 +963,6 @@ int ObPushdownFilterFactory::alloc(
return ret;
}
int ObPushdownFilterFactory::convert_white_filter_to_black(ObPushdownFilterExecutor *&filter)
{
int ret = OB_SUCCESS;
ObPushdownFilterNode *new_node = nullptr;
ObPushdownFilterExecutor *new_filter = nullptr;
if (OB_UNLIKELY(nullptr == filter || !filter->is_filter_white_node() || nullptr == alloc_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid filter", K(ret), KP_(alloc), KPC(filter));
} else if (OB_FAIL(alloc(BLACK_FILTER, 0, new_node))) {
LOG_WARN("Failed to alloc pushdown black filter node", K(ret));
} else if (OB_FAIL(alloc(BLACK_FILTER_EXECUTOR, 0, *new_node, new_filter, filter->get_op()))) {
LOG_WARN("Failed to alloc pushdown black filter executor", K(ret));
} else {
ObWhiteFilterExecutor *white_filter = static_cast<ObWhiteFilterExecutor *>(filter);
ObPushdownWhiteFilterNode &white_node = white_filter->get_filter_node();
ObPushdownBlackFilterNode &black_node = *static_cast<ObPushdownBlackFilterNode *>(new_node);
if (OB_FAIL(black_node.col_ids_.init(white_node.col_ids_.count()))) {
LOG_WARN("Failed to init col id array", K(ret), K(white_node.col_ids_.count()));
} else if (OB_FAIL(black_node.col_ids_.assign(white_node.col_ids_))) {
LOG_WARN("Failed to assign col id array", K(ret));
} else if (OB_FAIL(black_node.column_exprs_.init(white_node.column_exprs_.count()))) {
LOG_WARN("Failed to init column expr array", K(ret), K(white_node.column_exprs_.count()));
} else if (OB_FAIL(black_node.column_exprs_.assign(white_node.column_exprs_))) {
LOG_WARN("Failed to assign column expr array", K(ret));
} else if (OB_FAIL(black_node.filter_exprs_.init(1))) {
LOG_WARN("Failed to init filter expr array", K(ret));
} else if (OB_FAIL(black_node.filter_exprs_.push_back(white_node.expr_))) {
LOG_WARN("Failed to push back expr", K(ret));
} else {
white_filter->clear_in_datums();
filter = new_filter;
LOG_TRACE("convert white filter to black filter", K(ret), KPC(filter), KPC(white_filter));
}
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(alloc_)) {
if (nullptr != new_filter) {
new_filter->~ObPushdownFilterExecutor();
alloc_->free(new_filter);
new_filter = nullptr;
}
if (nullptr != new_node) {
new_node->~ObPushdownFilterNode();
alloc_->free(new_node);
new_node = nullptr;
}
}
}
return ret;
}
int ObPushdownFilter::serialize_pushdown_filter(
char *buf,
int64_t buf_len,
@ -1744,39 +1693,25 @@ int ObPushdownFilterExecutor::prepare_skip_filter()
}
// 初始化需要被清理的标记
int ObAndFilterExecutor::init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
int ObAndFilterExecutor::init_evaluated_datums(bool &is_valid)
{
int ret = OB_SUCCESS;
for (uint32_t i = 0; i < n_child_ && OB_SUCC(ret); ++i) {
need_convert = false;
if (OB_FAIL(childs_[i]->init_evaluated_datums(allocator, need_convert))) {
LOG_WARN("failed to filter child", K(ret));
} else if (OB_UNLIKELY(need_convert)) {
ObPushdownFilterFactory filter_factory(allocator);
if (OB_FAIL(filter_factory.convert_white_filter_to_black(childs_[i]))) {
LOG_WARN("Failed to convert white filter to black filter", K(ret), KPC(childs_[i]));
}
for (uint32_t i = 0; OB_SUCC(ret) && OB_LIKELY(is_valid) && i < n_child_; ++i) {
if (OB_FAIL(childs_[i]->init_evaluated_datums(is_valid))) {
LOG_WARN("failed to filter child", K(ret), K(i));
}
}
need_convert = false;
return ret;
}
int ObOrFilterExecutor::init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
int ObOrFilterExecutor::init_evaluated_datums(bool &is_valid)
{
int ret = OB_SUCCESS;
for (uint32_t i = 0; i < n_child_ && OB_SUCC(ret); ++i) {
need_convert = false;
if (OB_FAIL(childs_[i]->init_evaluated_datums(allocator, need_convert))) {
LOG_WARN("failed to filter child", K(ret));
} else if (OB_UNLIKELY(need_convert)) {
ObPushdownFilterFactory filter_factory(allocator);
if (OB_FAIL(filter_factory.convert_white_filter_to_black(childs_[i]))) {
LOG_WARN("Failed to convert white filter to black filter", K(ret), KPC(childs_[i]));
}
for (uint32_t i = 0; OB_SUCC(ret) && OB_LIKELY(is_valid) && i < n_child_; ++i) {
if (OB_FAIL(childs_[i]->init_evaluated_datums(is_valid))) {
LOG_WARN("failed to filter child", K(ret), K(i));
}
}
need_convert = false;
return ret;
}
@ -1817,11 +1752,10 @@ int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64
// 根据calc expr来设置每个列(空集)对应的清理Datum
// 这里将clear的datum放在filter node是为了更精准处理,其实只有涉及到的表达式清理即可,其他不需要清理
// 还有类似空集需要清理
int ObPhysicalFilterExecutor::init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
int ObPhysicalFilterExecutor::init_evaluated_datums(bool &is_valid)
{
UNUSED(allocator);
int ret = OB_SUCCESS;
need_convert = false;
is_valid = true;
const int32_t cur_eval_info_cnt = n_eval_infos_;
n_eval_infos_ = 0;
n_datum_eval_flags_ = 0;
@ -1908,38 +1842,36 @@ void ObPhysicalFilterExecutor::clear_evaluated_infos()
}
}
int ObWhiteFilterExecutor::init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
int ObWhiteFilterExecutor::init_evaluated_datums(bool &is_valid)
{
UNUSED(allocator);
int ret = OB_SUCCESS;
need_convert = false;
is_valid = true;
if (OB_ISNULL(filter_.expr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected filter expr", K(ret), KPC(filter_.expr_));
} else if (WHITE_OP_IN == filter_.get_op_type()) {
if (OB_FAIL(init_in_eval_datums(need_convert))) {
if (OB_UNLIKELY(need_convert)) {
if (OB_FAIL(init_in_eval_datums(is_valid))) {
if (OB_UNLIKELY(!is_valid)) {
ret = OB_SUCCESS;
} else {
LOG_WARN("Failed to init eval datums for WHITE_OP_IN filter", K(ret));
}
}
} else if (OB_FAIL(init_compare_eval_datums(need_convert))) {
if (OB_UNLIKELY(need_convert)) {
} else if (OB_FAIL(init_compare_eval_datums(is_valid))) {
if (OB_UNLIKELY(!is_valid)) {
ret = OB_SUCCESS;
} else {
LOG_WARN("Failed to init eval datums for compare white filter", K(ret));
}
}
LOG_DEBUG("[PUSHDOWN], white pushdown filter inited datum params", K(need_convert), K(datum_params_));
LOG_DEBUG("[PUSHDOWN], white pushdown filter inited datum params", K(is_valid), K(datum_params_));
return ret;
}
// In oracle mode, when the values in one column are all null,
// the result should be empty set even though the expr.eval() is not valid (e.g., c1 < 1/0).
// We do not handle this situation at the storage layer,
// but convert it into a black filter and hand it over to the sql layer to process.
int ObWhiteFilterExecutor::init_compare_eval_datums(bool &need_convert)
// We do not pushdown filter to the storage layer in this situation.
int ObWhiteFilterExecutor::init_compare_eval_datums(bool &is_valid)
{
int ret = OB_SUCCESS;
ObEvalCtx &eval_ctx = op_.get_eval_ctx();
@ -1949,7 +1881,7 @@ int ObWhiteFilterExecutor::init_compare_eval_datums(bool &need_convert)
if (OB_UNLIKELY(filter_.expr_->arg_cnt_ < 2)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected filter expr", K(ret), KPC(filter_.expr_));
} else if (OB_FAIL(ObPhysicalFilterExecutor::init_evaluated_datums(nullptr, need_convert))) {
} else if (OB_FAIL(ObPhysicalFilterExecutor::init_evaluated_datums(is_valid))) {
LOG_WARN("Failed to init evaluated datums", K(ret));
} else if (OB_FAIL(init_array_param(datum_params_, filter_.expr_->arg_cnt_))) {
LOG_WARN("Failed to alloc params", K(ret));
@ -1968,7 +1900,7 @@ int ObWhiteFilterExecutor::init_compare_eval_datums(bool &need_convert)
ObDatum *datum = NULL;
if (OB_FAIL(filter_.expr_->args_[i]->eval(eval_ctx, datum))) {
if (lib::is_oracle_mode()) {
need_convert = true;
is_valid = false;
} else {
LOG_WARN("evaluate filter arg expr failed", K(ret), K(i));
}
@ -1994,7 +1926,7 @@ int ObWhiteFilterExecutor::init_compare_eval_datums(bool &need_convert)
return ret;
}
int ObWhiteFilterExecutor::init_in_eval_datums(bool &need_convert)
int ObWhiteFilterExecutor::init_in_eval_datums(bool &is_valid)
{
int ret = OB_SUCCESS;
ObEvalCtx &eval_ctx = op_.get_eval_ctx();
@ -2009,7 +1941,7 @@ int ObWhiteFilterExecutor::init_in_eval_datums(bool &need_convert)
0 >= filter_.expr_->inner_func_cnt_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected filter expr", K(ret), KPC(filter_.expr_), KP(filter_.expr_->args_[0]), KP(filter_.expr_->args_[1]));
} else if (OB_FAIL(ObPhysicalFilterExecutor::init_evaluated_datums(nullptr, need_convert))) {
} else if (OB_FAIL(ObPhysicalFilterExecutor::init_evaluated_datums(is_valid))) {
LOG_WARN("Failed to init evaluated datums", K(ret));
} else if (OB_FAIL(init_array_param(datum_params_, filter_.expr_->inner_func_cnt_))) {
LOG_WARN("Failed to alloc params", K(ret));
@ -2030,7 +1962,8 @@ int ObWhiteFilterExecutor::init_in_eval_datums(bool &need_convert)
if (OB_SUCC(ret)) {
if (OB_FAIL(cur_arg->eval(eval_ctx, datum))) {
if (lib::is_oracle_mode()) {
need_convert = true;
is_valid = false;
clear_in_datums();
} else {
LOG_WARN("Evaluate filter arg expr failed", K(ret), K(i));
}
@ -2466,11 +2399,10 @@ void ObDynamicFilterExecutor::filter_on_success(ObPushdownFilterExecutor* parent
}
}
int ObDynamicFilterExecutor::init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
int ObDynamicFilterExecutor::init_evaluated_datums(bool &is_valid)
{
UNUSED(allocator);
int ret = OB_SUCCESS;
need_convert = false;
is_valid = true;
if (is_data_prepared_ && OB_NOT_NULL(runtime_filter_ctx_)
&& runtime_filter_ctx_->need_reset_in_rescan()) {
is_data_prepared_ = false;

View File

@ -460,7 +460,6 @@ public:
ObPushdownFilterNode &filter_node,
ObPushdownFilterExecutor *&filter_executor,
ObPushdownOperator &op);
int convert_white_filter_to_black(ObPushdownFilterExecutor *&filter);
private:
// pushdown filter
@ -696,7 +695,7 @@ public:
int pull_up_common_node(
const common::ObIArray<uint32_t> &filter_indexes,
ObPushdownFilterExecutor *&common_filter_executor);
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) { return common::OB_NOT_SUPPORTED; }
virtual int init_evaluated_datums(bool &is_valid) { return common::OB_NOT_SUPPORTED; }
int execute(
ObPushdownFilterExecutor *parent,
PushdownFilterInfo &filter_info,
@ -762,7 +761,7 @@ public:
{}
virtual ~ObPhysicalFilterExecutor();
int filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, const sql::ObBitVector &skip_bit, bool &ret_val);
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) override;
virtual int init_evaluated_datums(bool &is_valid) override;
virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) = 0;
INHERIT_TO_STRING_KV("ObPhysicalFilterExecutor", ObPushdownFilterExecutor,
K_(n_eval_infos), KP_(eval_infos));
@ -1000,7 +999,7 @@ public:
OB_INLINE virtual common::ObIArray<uint64_t> &get_col_ids() override
{ return filter_.get_col_ids(); }
virtual const common::ObIArray<ObExpr *> *get_cg_col_exprs() const override { return &filter_.column_exprs_; }
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) override;
virtual int init_evaluated_datums(bool &is_valid) override;
OB_INLINE const common::ObIArray<common::ObDatum> &get_datums() const
{ return datum_params_; }
OB_INLINE const common::ObDatum &get_min_param() const
@ -1054,7 +1053,7 @@ public:
OB_INLINE ObPushdownAndFilterNode &get_filter_node() { return filter_; }
OB_INLINE virtual common::ObIArray<uint64_t> &get_col_ids() override
{ return filter_.get_col_ids(); }
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) override;
virtual int init_evaluated_datums(bool &is_valid) override;
INHERIT_TO_STRING_KV("ObPushdownAndFilterExecutor", ObPushdownFilterExecutor, K_(filter));
private:
ObPushdownAndFilterNode &filter_;
@ -1072,7 +1071,7 @@ public:
OB_INLINE ObPushdownOrFilterNode &get_filter_node() { return filter_; }
OB_INLINE virtual common::ObIArray<uint64_t> &get_col_ids() override
{ return filter_.get_col_ids(); }
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) override;
virtual int init_evaluated_datums(bool &is_valid) override;
INHERIT_TO_STRING_KV("ObPushdownOrFilterExecutor", ObPushdownFilterExecutor, K_(filter));
private:
ObPushdownOrFilterNode &filter_;
@ -1104,7 +1103,7 @@ public:
{
return static_cast<const ObPushdownDynamicFilterNode &>(filter_);
}
virtual int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert) override;
virtual int init_evaluated_datums(bool &is_valid) override;
int check_runtime_filter(ObPushdownFilterExecutor* parent_filter, bool &is_needed);
void filter_on_bypass(ObPushdownFilterExecutor* parent_filter);
void filter_on_success(ObPushdownFilterExecutor* parent_filter);

View File

@ -38,7 +38,8 @@ ObBlockRowStore::ObBlockRowStore(ObTableAccessContext &context)
can_blockscan_(false),
filter_applied_(false),
disabled_(false),
is_aggregated_in_prefetch_(false)
is_aggregated_in_prefetch_(false),
filter_valid_(true)
{}
ObBlockRowStore::~ObBlockRowStore()
@ -54,6 +55,7 @@ void ObBlockRowStore::reset()
disabled_ = false;
is_aggregated_in_prefetch_ = false;
iter_param_ = nullptr;
filter_valid_ = true;
}
void ObBlockRowStore::reuse()
@ -151,7 +153,6 @@ int ObBlockRowStore::open(ObTableIterParam &iter_param)
{
int ret = OB_SUCCESS;
const bool need_padding = is_pad_char_to_full_length(context_.sql_mode_);
bool need_convert = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
@ -162,19 +163,14 @@ int ObBlockRowStore::open(ObTableIterParam &iter_param)
LOG_WARN("Invalid argument to init store pushdown filter", K(ret), K(iter_param));
} else if (nullptr == pd_filter_info_.filter_) {
// nothing to do
} else if (OB_FAIL(pd_filter_info_.filter_->init_evaluated_datums(&pd_filter_info_.filter_->get_allocator(), need_convert))) {
} else if (OB_FAIL(pd_filter_info_.filter_->init_evaluated_datums(filter_valid_))) {
LOG_WARN("Failed to init pushdown filter evaluated datums", K(ret));
} else {
if (OB_UNLIKELY(need_convert)) {
sql::ObPushdownFilterFactory filter_factory(&pd_filter_info_.filter_->get_allocator());
if (OB_FAIL(filter_factory.convert_white_filter_to_black(pd_filter_info_.filter_))) {
LOG_WARN("Failed to convert white filter to black filter", K(ret), KPC_(pd_filter_info_.filter));
} else {
iter_param.pushdown_filter_ = pd_filter_info_.filter_;
}
if (OB_UNLIKELY(!filter_valid_)) {
iter_param.pd_storage_flag_.set_filter_pushdown(false);
pd_filter_info_.is_pd_filter_ = false;
}
if (OB_FAIL(ret)) {
} else if (iter_param.is_use_column_store()) {
if (iter_param.is_use_column_store()) {
if (OB_FAIL(pd_filter_info_.filter_->init_co_filter_param(iter_param, need_padding))) {
LOG_WARN("Failed to init pushdown filter executor", K(ret));
}

View File

@ -81,7 +81,7 @@ public:
virtual bool is_empty() const { return true; }
OB_INLINE bool is_vec2() const { return is_vec2_; } // need to remove after statistical info pushdown support vec 2.0
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(can_blockscan), K_(filter_applied),
K_(disabled), K_(is_aggregated_in_prefetch), K_(is_vec2));
K_(disabled), K_(is_aggregated_in_prefetch), K_(is_vec2), K_(filter_valid));
protected:
int filter_micro_block(
const int64_t row_count,
@ -98,7 +98,7 @@ private:
bool filter_applied_;
bool disabled_;
bool is_aggregated_in_prefetch_;
bool filter_valid_;
};
}

View File

@ -234,10 +234,9 @@ public:
}
}
OB_INLINE common::ObIAllocator *get_allocator() { return allocator_; }
OB_INLINE int init_evaluated_datums(common::ObIAllocator *allocator, bool &need_convert)
OB_INLINE int init_evaluated_datums(bool &is_valid) override
{
UNUSED(allocator);
need_convert = false;
is_valid = true;
return OB_SUCCESS;
}
TO_STRING_KV(K_(is_inited), K_(is_reverse_scan), K_(row_num), K_(interval_infos),