occupy position

This commit is contained in:
obdev 2024-06-21 09:21:54 +00:00 committed by ob-robot
parent 2724fb32a8
commit 3723b0f580
9 changed files with 142 additions and 25 deletions

View File

@ -831,6 +831,7 @@ ob_set_subtarget(ob_sql engine_px
engine/px/p2p_datahub/ob_runtime_filter_query_range.cpp
engine/px/p2p_datahub/ob_runtime_filter_msg.cpp
engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp
engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.cpp
)
ob_set_subtarget(ob_sql engine_set

View File

@ -52,7 +52,7 @@ ObPushdownFilterFactory::FilterExecutorAllocFunc ObPushdownFilterFactory::FILTER
};
ObDynamicFilterExecutor::PreparePushdownDataFunc ObDynamicFilterExecutor::PREPARE_PD_DATA_FUNCS
[PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE] = {
[DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE] = {
ObExprJoinFilter::prepare_storage_white_filter_data,
};
@ -111,7 +111,7 @@ OB_DEF_SERIALIZE_SIZE(ObPushdownWhiteFilterNode)
}
OB_SERIALIZE_MEMBER((ObPushdownDynamicFilterNode, ObPushdownWhiteFilterNode), col_idx_,
is_first_child_, is_last_child_, val_meta_, prepare_data_func_type_);
is_first_child_, is_last_child_, val_meta_, dynamic_filter_type_);
int ObPushdownBlackFilterNode::merge(ObIArray<ObPushdownFilterNode*> &merged_node)
{
@ -227,11 +227,11 @@ int ObPushdownDynamicFilterNode::set_op_type(const ObRawExpr &raw_expr)
switch (type) {
case RANGE:
op_type_ = WHITE_OP_BT;
prepare_data_func_type_ = RUNTIME_FILTER_PREPARE_DATA;
dynamic_filter_type_ = JOIN_RUNTIME_FILTER;
break;
case IN:
op_type_ = WHITE_OP_IN;
prepare_data_func_type_ = RUNTIME_FILTER_PREPARE_DATA;
dynamic_filter_type_ = JOIN_RUNTIME_FILTER;
break;
default:
ret = OB_ERR_UNEXPECTED;
@ -2444,13 +2444,13 @@ int ObDynamicFilterExecutor::try_preparing_data()
{
int ret = OB_SUCCESS;
ObRuntimeFilterParams runtime_filter_params;
PreparePushdownDataFuncType prepare_data_func_type =
static_cast<ObPushdownDynamicFilterNode &>(filter_).get_prepare_data_func_type();
if (prepare_data_func_type >= PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE) {
DynamicFilterType dynamic_filter_type =
static_cast<ObPushdownDynamicFilterNode &>(filter_).get_dynamic_filter_type();
if (dynamic_filter_type >= DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid func type", K(ret), K(prepare_data_func_type));
LOG_WARN("invalid func type", K(ret), K(dynamic_filter_type));
} else {
ret = PREPARE_PD_DATA_FUNCS[prepare_data_func_type](
ret = PREPARE_PD_DATA_FUNCS[dynamic_filter_type](
*filter_.expr_, *this, op_.get_eval_ctx(), runtime_filter_params, is_data_prepared_);
}
if (OB_FAIL(ret)) {

View File

@ -93,10 +93,11 @@ enum DynamicFilterAction
PASS_ALL, // if the filter not ready or not active, all data are selected
};
enum PreparePushdownDataFuncType
enum DynamicFilterType
{
RUNTIME_FILTER_PREPARE_DATA = 0,
MAX_PREPARE_DATA_FUNC_TYPE
JOIN_RUNTIME_FILTER= 0,
PD_TOPN_FILTER = 1,
MAX_DYNAMIC_FILTER_TYPE
};
struct ObBoolMask
@ -384,7 +385,7 @@ public:
ObPushdownDynamicFilterNode(common::ObIAllocator &alloc)
: ObPushdownWhiteFilterNode(alloc), col_idx_(0), is_first_child_(false),
is_last_child_(false), val_meta_(),
prepare_data_func_type_(PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE)
dynamic_filter_type_(DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE)
{
type_ = PushdownFilterType::DYNAMIC_FILTER;
}
@ -411,13 +412,13 @@ public:
UNUSED(arg_idx);
return val_meta_.get_type();
}
inline void set_prepare_data_func_type(PreparePushdownDataFuncType func_type)
inline void set_dynamic_filter_type(DynamicFilterType func_type)
{
prepare_data_func_type_ = func_type;
dynamic_filter_type_ = func_type;
}
inline PreparePushdownDataFuncType get_prepare_data_func_type()
inline DynamicFilterType get_dynamic_filter_type() const
{
return prepare_data_func_type_;
return dynamic_filter_type_;
}
private:
int64_t col_idx_; // mark which column for multi columns runtime filter
@ -432,7 +433,7 @@ private:
bool is_first_child_;
bool is_last_child_;
ObObjMeta val_meta_;
PreparePushdownDataFuncType prepare_data_func_type_;
DynamicFilterType dynamic_filter_type_;
};
class ObPushdownFilterExecutor;
@ -1047,8 +1048,7 @@ public:
ObEvalCtx &eval_ctx,
ObRuntimeFilterParams &params,
bool &is_data_prepared);
static PreparePushdownDataFunc
PREPARE_PD_DATA_FUNCS[PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE];
static PreparePushdownDataFunc PREPARE_PD_DATA_FUNCS[DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE];
private:
int try_preparing_data();
void update_rf_slide_window();

View File

@ -0,0 +1,28 @@
/**
* Copyright (c) 2024 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h"
namespace oceanbase
{
namespace sql
{
OB_SERIALIZE_MEMBER(ObTopNFilterCmpMeta, ser_cmp_func_, obj_meta_);
OB_SERIALIZE_MEMBER(ObTopNFilterCompare, build_meta_, filter_meta_, is_ascending_, null_pos_);
OB_SERIALIZE_MEMBER(ObPushDownTopNFilterInfo, enabled_, p2p_dh_id_, effective_sk_cnt_,
total_sk_cnt_, cmp_metas_, dh_msg_type_, expr_ctx_id_, is_shared_, is_shuffle_,
max_batch_size_, adaptive_filter_ratio_);
} // end namespace sql
} // end namespace oceanbase

View File

@ -0,0 +1,81 @@
/**
* Copyright (c) 2024 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#include "lib/ob_define.h"
#include "lib/container/ob_array.h"
#include "share/datum/ob_datum.h"
#include "sql/engine/basic/ob_chunk_datum_store.h"
#include "sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h"
namespace oceanbase
{
namespace sql
{
struct ObTopNFilterCmpMeta final
{
OB_UNIS_VERSION_V(1);
public:
union
{
NullSafeRowCmpFunc cmp_func_;
sql::serializable_function ser_cmp_func_;
};
ObObjMeta obj_meta_;
TO_STRING_KV(K(obj_meta_), K(ser_cmp_func_));
};
struct ObTopNFilterCompare final
{
OB_UNIS_VERSION_V(1);
public:
// in join scene, the join cond is T1.a=T2.b, sometimes a and b are not same type but not need to
// cast if the sql with order by T1.a, the topn filter can pushdown to T2.b, but the compare info
// is differnet in build stage and filter stage
ObTopNFilterCmpMeta build_meta_;
ObTopNFilterCmpMeta filter_meta_;
bool is_ascending_;
common::ObCmpNullPos null_pos_;
TO_STRING_KV(K(build_meta_), K(filter_meta_), K(is_ascending_), K(null_pos_));
};
typedef common::ObFixedArray<ObTopNFilterCompare, common::ObIAllocator> ObTopNFilterCompares;
struct ObPushDownTopNFilterInfo
{
OB_UNIS_VERSION(1);
public:
explicit ObPushDownTopNFilterInfo(common::ObIAllocator &alloc)
: enabled_(false), p2p_dh_id_(OB_INVALID), effective_sk_cnt_(0), total_sk_cnt_(0),
cmp_metas_(alloc), dh_msg_type_(ObP2PDatahubMsgBase::ObP2PDatahubMsgType::NOT_INIT),
expr_ctx_id_(UINT32_MAX /*INVALID_EXP_CTX_ID*/), is_shared_(false), is_shuffle_(false),
max_batch_size_(0), adaptive_filter_ratio_(0.5)
{}
public:
bool enabled_;
int64_t p2p_dh_id_;
int64_t effective_sk_cnt_;
int64_t total_sk_cnt_;
ObFixedArray<ObTopNFilterCmpMeta, common::ObIAllocator> cmp_metas_;
ObP2PDatahubMsgBase::ObP2PDatahubMsgType dh_msg_type_;
uint32_t expr_ctx_id_;
bool is_shared_; // whether the filter is shared in sql level
bool is_shuffle_; // whether need shuffle topn msg between differnet dfos
int64_t max_batch_size_;
double adaptive_filter_ratio_;
TO_STRING_KV(K(enabled_), K(p2p_dh_id_), K(dh_msg_type_), K(expr_ctx_id_), K(is_shared_),
K(is_shuffle_), K(max_batch_size_), K(adaptive_filter_ratio_));
};
} // end namespace sql
} // end namespace oceanbase

View File

@ -39,7 +39,8 @@ ObSortSpec::ObSortSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type
is_fetch_with_ties_(false),
prescan_enabled_(false),
enable_encode_sortkey_opt_(false),
part_cnt_(0)
part_cnt_(0),
pd_topn_filter_info_(alloc)
{}
OB_SERIALIZE_MEMBER((ObSortSpec, ObOpSpec),
@ -57,7 +58,8 @@ OB_SERIALIZE_MEMBER((ObSortSpec, ObOpSpec),
prescan_enabled_,
enable_encode_sortkey_opt_,
part_cnt_,
compress_type_);
compress_type_,
pd_topn_filter_info_);
ObSortOp::ObSortOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(ctx_, spec, input),

View File

@ -20,6 +20,7 @@
#include "sql/engine/sort/ob_sort_basic_info.h"
#include "sql/engine/basic/chunk_store/ob_compact_store.h"
#include "sql/engine/basic/ob_chunk_datum_store.h"
#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h"
namespace oceanbase
{
@ -59,6 +60,7 @@ public:
bool enable_encode_sortkey_opt_;
// if use, all_exprs_ is : hash(part_by) + part_by + order_by.
int64_t part_cnt_;
ObPushDownTopNFilterInfo pd_topn_filter_info_;
};
class ObSortOp : public ObOperator

View File

@ -28,13 +28,14 @@ ObSortVecSpec::ObSortVecSpec(common::ObIAllocator &alloc, const ObPhyOperatorTyp
sk_exprs_(alloc), addon_exprs_(alloc), sk_collations_(alloc), addon_collations_(alloc),
minimum_row_count_(0), topk_precision_(0), prefix_pos_(0), is_local_merge_sort_(false),
is_fetch_with_ties_(false), prescan_enabled_(false), enable_encode_sortkey_opt_(false),
has_addon_(false), part_cnt_(0), compress_type_(NONE_COMPRESSOR)
has_addon_(false), part_cnt_(0), pd_topn_filter_info_(alloc)
{}
OB_SERIALIZE_MEMBER((ObSortVecSpec, ObOpSpec), topn_expr_, topk_limit_expr_, topk_offset_expr_,
sk_exprs_, addon_exprs_, sk_collations_, addon_collations_, minimum_row_count_,
topk_precision_, prefix_pos_, is_local_merge_sort_, is_fetch_with_ties_,
prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_);
prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_,
pd_topn_filter_info_);
ObSortVecOp::ObSortVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) :
ObOperator(ctx_, spec, input), sort_op_provider_(op_monitor_info_), sort_row_count_(0),

View File

@ -19,6 +19,7 @@
#include "sql/engine/ob_operator.h"
#include "sql/engine/sort/ob_sort_basic_info.h"
#include "sql/engine/sort/ob_sort_vec_op_provider.h"
#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h"
namespace oceanbase
{
@ -57,7 +58,8 @@ public:
bool has_addon_;
// if use, all_exprs_ is : hash(part_by) + part_by + order_by.
int64_t part_cnt_;
ObCompressorType compress_type_;
// pushdown topn filter: pushdown the heap top data to table scan for filtering out data early.
ObPushDownTopNFilterInfo pd_topn_filter_info_;
};
class ObSortVecOp : public ObOperator