diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index f64414dde..a799e874f 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -831,6 +831,7 @@ ob_set_subtarget(ob_sql engine_px engine/px/p2p_datahub/ob_runtime_filter_query_range.cpp engine/px/p2p_datahub/ob_runtime_filter_msg.cpp engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp + engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.cpp ) ob_set_subtarget(ob_sql engine_set diff --git a/src/sql/engine/basic/ob_pushdown_filter.cpp b/src/sql/engine/basic/ob_pushdown_filter.cpp index fb772c032..bbce63d31 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.cpp +++ b/src/sql/engine/basic/ob_pushdown_filter.cpp @@ -52,7 +52,7 @@ ObPushdownFilterFactory::FilterExecutorAllocFunc ObPushdownFilterFactory::FILTER }; ObDynamicFilterExecutor::PreparePushdownDataFunc ObDynamicFilterExecutor::PREPARE_PD_DATA_FUNCS - [PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE] = { + [DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE] = { ObExprJoinFilter::prepare_storage_white_filter_data, }; @@ -111,7 +111,7 @@ OB_DEF_SERIALIZE_SIZE(ObPushdownWhiteFilterNode) } OB_SERIALIZE_MEMBER((ObPushdownDynamicFilterNode, ObPushdownWhiteFilterNode), col_idx_, - is_first_child_, is_last_child_, val_meta_, prepare_data_func_type_); + is_first_child_, is_last_child_, val_meta_, dynamic_filter_type_); int ObPushdownBlackFilterNode::merge(ObIArray &merged_node) { @@ -227,11 +227,11 @@ int ObPushdownDynamicFilterNode::set_op_type(const ObRawExpr &raw_expr) switch (type) { case RANGE: op_type_ = WHITE_OP_BT; - prepare_data_func_type_ = RUNTIME_FILTER_PREPARE_DATA; + dynamic_filter_type_ = JOIN_RUNTIME_FILTER; break; case IN: op_type_ = WHITE_OP_IN; - prepare_data_func_type_ = RUNTIME_FILTER_PREPARE_DATA; + dynamic_filter_type_ = JOIN_RUNTIME_FILTER; break; default: ret = OB_ERR_UNEXPECTED; @@ -2444,13 +2444,13 @@ int ObDynamicFilterExecutor::try_preparing_data() { int ret = OB_SUCCESS; ObRuntimeFilterParams runtime_filter_params; - PreparePushdownDataFuncType prepare_data_func_type = - static_cast(filter_).get_prepare_data_func_type(); - if (prepare_data_func_type >= PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE) { + DynamicFilterType dynamic_filter_type = + static_cast(filter_).get_dynamic_filter_type(); + if (dynamic_filter_type >= DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid func type", K(ret), K(prepare_data_func_type)); + LOG_WARN("invalid func type", K(ret), K(dynamic_filter_type)); } else { - ret = PREPARE_PD_DATA_FUNCS[prepare_data_func_type]( + ret = PREPARE_PD_DATA_FUNCS[dynamic_filter_type]( *filter_.expr_, *this, op_.get_eval_ctx(), runtime_filter_params, is_data_prepared_); } if (OB_FAIL(ret)) { diff --git a/src/sql/engine/basic/ob_pushdown_filter.h b/src/sql/engine/basic/ob_pushdown_filter.h index 068d9eb3c..5ce922888 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.h +++ b/src/sql/engine/basic/ob_pushdown_filter.h @@ -93,10 +93,11 @@ enum DynamicFilterAction PASS_ALL, // if the filter not ready or not active, all data are selected }; -enum PreparePushdownDataFuncType +enum DynamicFilterType { - RUNTIME_FILTER_PREPARE_DATA = 0, - MAX_PREPARE_DATA_FUNC_TYPE + JOIN_RUNTIME_FILTER= 0, + PD_TOPN_FILTER = 1, + MAX_DYNAMIC_FILTER_TYPE }; struct ObBoolMask @@ -384,7 +385,7 @@ public: ObPushdownDynamicFilterNode(common::ObIAllocator &alloc) : ObPushdownWhiteFilterNode(alloc), col_idx_(0), is_first_child_(false), is_last_child_(false), val_meta_(), - prepare_data_func_type_(PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE) + dynamic_filter_type_(DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE) { type_ = PushdownFilterType::DYNAMIC_FILTER; } @@ -411,13 +412,13 @@ public: UNUSED(arg_idx); return val_meta_.get_type(); } - inline void set_prepare_data_func_type(PreparePushdownDataFuncType func_type) + inline void set_dynamic_filter_type(DynamicFilterType func_type) { - prepare_data_func_type_ = func_type; + dynamic_filter_type_ = func_type; } - inline PreparePushdownDataFuncType get_prepare_data_func_type() + inline DynamicFilterType get_dynamic_filter_type() const { - return prepare_data_func_type_; + return dynamic_filter_type_; } private: int64_t col_idx_; // mark which column for multi columns runtime filter @@ -432,7 +433,7 @@ private: bool is_first_child_; bool is_last_child_; ObObjMeta val_meta_; - PreparePushdownDataFuncType prepare_data_func_type_; + DynamicFilterType dynamic_filter_type_; }; class ObPushdownFilterExecutor; @@ -1047,8 +1048,7 @@ public: ObEvalCtx &eval_ctx, ObRuntimeFilterParams ¶ms, bool &is_data_prepared); - static PreparePushdownDataFunc - PREPARE_PD_DATA_FUNCS[PreparePushdownDataFuncType::MAX_PREPARE_DATA_FUNC_TYPE]; + static PreparePushdownDataFunc PREPARE_PD_DATA_FUNCS[DynamicFilterType::MAX_DYNAMIC_FILTER_TYPE]; private: int try_preparing_data(); void update_rf_slide_window(); diff --git a/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.cpp new file mode 100644 index 000000000..2d4af7841 --- /dev/null +++ b/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.cpp @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_ENG + +#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h" + +namespace oceanbase +{ +namespace sql +{ + +OB_SERIALIZE_MEMBER(ObTopNFilterCmpMeta, ser_cmp_func_, obj_meta_); +OB_SERIALIZE_MEMBER(ObTopNFilterCompare, build_meta_, filter_meta_, is_ascending_, null_pos_); +OB_SERIALIZE_MEMBER(ObPushDownTopNFilterInfo, enabled_, p2p_dh_id_, effective_sk_cnt_, + total_sk_cnt_, cmp_metas_, dh_msg_type_, expr_ctx_id_, is_shared_, is_shuffle_, + max_batch_size_, adaptive_filter_ratio_); +} // end namespace sql +} // end namespace oceanbase diff --git a/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h b/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h new file mode 100644 index 000000000..c472d5688 --- /dev/null +++ b/src/sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once +#include "lib/ob_define.h" +#include "lib/container/ob_array.h" +#include "share/datum/ob_datum.h" +#include "sql/engine/basic/ob_chunk_datum_store.h" +#include "sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h" + +namespace oceanbase +{ +namespace sql +{ + +struct ObTopNFilterCmpMeta final +{ + OB_UNIS_VERSION_V(1); +public: + union + { + NullSafeRowCmpFunc cmp_func_; + sql::serializable_function ser_cmp_func_; + }; + ObObjMeta obj_meta_; + TO_STRING_KV(K(obj_meta_), K(ser_cmp_func_)); +}; + +struct ObTopNFilterCompare final +{ + OB_UNIS_VERSION_V(1); +public: + // in join scene, the join cond is T1.a=T2.b, sometimes a and b are not same type but not need to + // cast if the sql with order by T1.a, the topn filter can pushdown to T2.b, but the compare info + // is differnet in build stage and filter stage + ObTopNFilterCmpMeta build_meta_; + ObTopNFilterCmpMeta filter_meta_; + bool is_ascending_; + common::ObCmpNullPos null_pos_; + TO_STRING_KV(K(build_meta_), K(filter_meta_), K(is_ascending_), K(null_pos_)); +}; +typedef common::ObFixedArray ObTopNFilterCompares; + +struct ObPushDownTopNFilterInfo +{ + OB_UNIS_VERSION(1); + +public: + explicit ObPushDownTopNFilterInfo(common::ObIAllocator &alloc) + : enabled_(false), p2p_dh_id_(OB_INVALID), effective_sk_cnt_(0), total_sk_cnt_(0), + cmp_metas_(alloc), dh_msg_type_(ObP2PDatahubMsgBase::ObP2PDatahubMsgType::NOT_INIT), + expr_ctx_id_(UINT32_MAX /*INVALID_EXP_CTX_ID*/), is_shared_(false), is_shuffle_(false), + max_batch_size_(0), adaptive_filter_ratio_(0.5) + {} +public: + bool enabled_; + int64_t p2p_dh_id_; + int64_t effective_sk_cnt_; + int64_t total_sk_cnt_; + ObFixedArray cmp_metas_; + ObP2PDatahubMsgBase::ObP2PDatahubMsgType dh_msg_type_; + uint32_t expr_ctx_id_; + bool is_shared_; // whether the filter is shared in sql level + bool is_shuffle_; // whether need shuffle topn msg between differnet dfos + int64_t max_batch_size_; + double adaptive_filter_ratio_; + TO_STRING_KV(K(enabled_), K(p2p_dh_id_), K(dh_msg_type_), K(expr_ctx_id_), K(is_shared_), + K(is_shuffle_), K(max_batch_size_), K(adaptive_filter_ratio_)); +}; + +} // end namespace sql +} // end namespace oceanbase diff --git a/src/sql/engine/sort/ob_sort_op.cpp b/src/sql/engine/sort/ob_sort_op.cpp index aa17a5536..cab8c6dd2 100644 --- a/src/sql/engine/sort/ob_sort_op.cpp +++ b/src/sql/engine/sort/ob_sort_op.cpp @@ -39,7 +39,8 @@ ObSortSpec::ObSortSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type is_fetch_with_ties_(false), prescan_enabled_(false), enable_encode_sortkey_opt_(false), - part_cnt_(0) + part_cnt_(0), + pd_topn_filter_info_(alloc) {} OB_SERIALIZE_MEMBER((ObSortSpec, ObOpSpec), @@ -57,7 +58,8 @@ OB_SERIALIZE_MEMBER((ObSortSpec, ObOpSpec), prescan_enabled_, enable_encode_sortkey_opt_, part_cnt_, - compress_type_); + compress_type_, + pd_topn_filter_info_); ObSortOp::ObSortOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) : ObOperator(ctx_, spec, input), diff --git a/src/sql/engine/sort/ob_sort_op.h b/src/sql/engine/sort/ob_sort_op.h index d8526a5b2..54a8ecb1c 100644 --- a/src/sql/engine/sort/ob_sort_op.h +++ b/src/sql/engine/sort/ob_sort_op.h @@ -20,6 +20,7 @@ #include "sql/engine/sort/ob_sort_basic_info.h" #include "sql/engine/basic/chunk_store/ob_compact_store.h" #include "sql/engine/basic/ob_chunk_datum_store.h" +#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h" namespace oceanbase { @@ -59,6 +60,7 @@ public: bool enable_encode_sortkey_opt_; // if use, all_exprs_ is : hash(part_by) + part_by + order_by. int64_t part_cnt_; + ObPushDownTopNFilterInfo pd_topn_filter_info_; }; class ObSortOp : public ObOperator diff --git a/src/sql/engine/sort/ob_sort_vec_op.cpp b/src/sql/engine/sort/ob_sort_vec_op.cpp index 72aaac3f4..b86a53e36 100644 --- a/src/sql/engine/sort/ob_sort_vec_op.cpp +++ b/src/sql/engine/sort/ob_sort_vec_op.cpp @@ -28,13 +28,14 @@ ObSortVecSpec::ObSortVecSpec(common::ObIAllocator &alloc, const ObPhyOperatorTyp sk_exprs_(alloc), addon_exprs_(alloc), sk_collations_(alloc), addon_collations_(alloc), minimum_row_count_(0), topk_precision_(0), prefix_pos_(0), is_local_merge_sort_(false), is_fetch_with_ties_(false), prescan_enabled_(false), enable_encode_sortkey_opt_(false), - has_addon_(false), part_cnt_(0), compress_type_(NONE_COMPRESSOR) + has_addon_(false), part_cnt_(0), pd_topn_filter_info_(alloc) {} OB_SERIALIZE_MEMBER((ObSortVecSpec, ObOpSpec), topn_expr_, topk_limit_expr_, topk_offset_expr_, sk_exprs_, addon_exprs_, sk_collations_, addon_collations_, minimum_row_count_, topk_precision_, prefix_pos_, is_local_merge_sort_, is_fetch_with_ties_, - prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_); + prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_, + pd_topn_filter_info_); ObSortVecOp::ObSortVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) : ObOperator(ctx_, spec, input), sort_op_provider_(op_monitor_info_), sort_row_count_(0), diff --git a/src/sql/engine/sort/ob_sort_vec_op.h b/src/sql/engine/sort/ob_sort_vec_op.h index 44ddf0d32..e1909a5cd 100644 --- a/src/sql/engine/sort/ob_sort_vec_op.h +++ b/src/sql/engine/sort/ob_sort_vec_op.h @@ -19,6 +19,7 @@ #include "sql/engine/ob_operator.h" #include "sql/engine/sort/ob_sort_basic_info.h" #include "sql/engine/sort/ob_sort_vec_op_provider.h" +#include "sql/engine/px/p2p_datahub/ob_pushdown_topn_filter_msg.h" namespace oceanbase { @@ -57,7 +58,8 @@ public: bool has_addon_; // if use, all_exprs_ is : hash(part_by) + part_by + order_by. int64_t part_cnt_; - ObCompressorType compress_type_; + // pushdown topn filter: pushdown the heap top data to table scan for filtering out data early. + ObPushDownTopNFilterInfo pd_topn_filter_info_; }; class ObSortVecOp : public ObOperator