diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index c77758bcf..c5cedfd8e 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -62,6 +62,7 @@ ob_set_subtarget(ob_sql das das/ob_das_retry_ctrl.cpp das/ob_das_simple_op.cpp das/ob_text_retrieval_op.cpp + das/ob_das_attach_define.cpp ) ob_set_subtarget(ob_sql dtl diff --git a/src/sql/das/ob_das_attach_define.cpp b/src/sql/das/ob_das_attach_define.cpp new file mode 100644 index 000000000..1dbc7cfdf --- /dev/null +++ b/src/sql/das/ob_das_attach_define.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2022 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + * + * ob_das_attach_define.cpp + * + * Author: yuming<> + */ +#define USING_LOG_PREFIX SQL_DAS +#include "sql/das/ob_das_attach_define.h" +#include "sql/das/ob_das_factory.h" +namespace oceanbase +{ +namespace sql +{ + +OB_DEF_SERIALIZE(ObDASAttachSpec) +{ + int ret = OB_SUCCESS; + bool has_attach_ctdef = false; + OB_UNIS_ENCODE(has_attach_ctdef); + return ret; +} + +OB_DEF_DESERIALIZE(ObDASAttachSpec) +{ + int ret = OB_SUCCESS; + bool has_attach_ctdef = false; + OB_UNIS_DECODE(has_attach_ctdef); + return ret; +} + +OB_DEF_SERIALIZE_SIZE(ObDASAttachSpec) +{ + int64_t len = 0; + bool has_attach_ctdef = false; + OB_UNIS_ADD_LEN(has_attach_ctdef); + return len; +} + +} +} diff --git a/src/sql/das/ob_das_attach_define.h b/src/sql/das/ob_das_attach_define.h new file mode 100644 index 000000000..529ef3bce --- /dev/null +++ b/src/sql/das/ob_das_attach_define.h @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2022 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + * + * ob_das_attach_define.h + * + * Author: yuming<> + */ +#ifndef OBDEV_SRC_SQL_DAS_OB_DAS_ATTACH_DEFINE_H_ +#define OBDEV_SRC_SQL_DAS_OB_DAS_ATTACH_DEFINE_H_ +#include "sql/das/ob_das_define.h" +#include "share/ob_define.h" +#include "sql/engine/expr/ob_expr.h" + +namespace oceanbase +{ +namespace sql +{ + +struct ObDASAttachSpec +{ + OB_UNIS_VERSION(1); +public: + ObDASAttachSpec(common::ObIAllocator &alloc, ObDASBaseCtDef *scan_ctdef) + : attach_loc_metas_(alloc), + scan_ctdef_(nullptr), + allocator_(alloc), + attach_ctdef_(nullptr) + { + } + common::ObList attach_loc_metas_; + ObDASBaseCtDef *scan_ctdef_; //This ctdef represents the main task information executed by the DAS Task. + common::ObIAllocator &allocator_; + ObDASBaseCtDef *attach_ctdef_; //The attach_ctdef represents the task information that is bound to and executed on the DAS Task. + + TO_STRING_KV(K_(attach_loc_metas), + K_(attach_ctdef)); +}; + +} +} + +#endif \ No newline at end of file diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index e5f2967af..006feb7a3 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -171,7 +171,7 @@ ObDASScanOp::ObDASScanOp(ObIAllocator &op_alloc) result_(nullptr), remain_row_cnt_(0), retry_alloc_(nullptr), - ir_param_(op_alloc) + ir_param_() { } @@ -744,69 +744,7 @@ OB_SERIALIZE_MEMBER((ObDASScanOp, ObIDASTaskOp), scan_param_.ss_key_ranges_, ir_param_); -OB_DEF_SERIALIZE(ObDASIRParam) -{ - int ret = OB_SUCCESS; - const bool ir_scan = is_ir_scan(); - LST_DO_CODE(OB_UNIS_ENCODE, ir_scan); - if (OB_SUCC(ret) && ir_scan) { - if (OB_ISNULL(ctdef_) || OB_ISNULL(rtdef_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected nullptr", K(ret)); - } else { - LST_DO_CODE(OB_UNIS_ENCODE, - *ctdef_, - *rtdef_, - ls_id_, - inv_idx_tablet_id_, - doc_id_idx_tablet_id_); - } - } - return ret; -} - -OB_DEF_DESERIALIZE(ObDASIRParam) -{ - int ret = OB_SUCCESS; - bool ir_scan = false; - LST_DO_CODE(OB_UNIS_DECODE, ir_scan); - if (OB_SUCC(ret) && ir_scan) { - if (OB_ISNULL(ctdef_ = OB_NEWx(ObDASIRCtDef, &allocator_, allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory for ir ctdef", K(ret)); - } else if (OB_ISNULL(rtdef_ = OB_NEWx(ObDASIRRtDef, &allocator_, allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory for ir rtdef", K(ret)); - } else { - LST_DO_CODE(OB_UNIS_DECODE, - *ctdef_, - *rtdef_, - ls_id_, - inv_idx_tablet_id_, - doc_id_idx_tablet_id_); - } - } - return ret; -} - -OB_DEF_SERIALIZE_SIZE(ObDASIRParam) -{ - int64_t len = 0; - const bool ir_scan = is_ir_scan(); - LST_DO_CODE(OB_UNIS_ADD_LEN, ir_scan); - if (ir_scan) { - if (nullptr != ctdef_ && nullptr != rtdef_) { - LST_DO_CODE(OB_UNIS_ADD_LEN, - *ctdef_, - *rtdef_); - } - LST_DO_CODE(OB_UNIS_ADD_LEN, - ls_id_, - inv_idx_tablet_id_, - doc_id_idx_tablet_id_); - } - return len; -} +OB_SERIALIZE_MEMBER(ObDASObsoletedObj, flag_); ObDASScanResult::ObDASScanResult() : ObIDASTaskResult(), diff --git a/src/sql/das/ob_das_scan_op.h b/src/sql/das/ob_das_scan_op.h index 96597c4cf..76485c172 100644 --- a/src/sql/das/ob_das_scan_op.h +++ b/src/sql/das/ob_das_scan_op.h @@ -173,30 +173,13 @@ private: }; }; -struct ObDASIRParam +struct ObDASObsoletedObj { OB_UNIS_VERSION(1); public: - ObDASIRParam(ObIAllocator &alloc) - : allocator_(alloc), - ctdef_(nullptr), - rtdef_(nullptr), - ls_id_(OB_INVALID_ID), - inv_idx_tablet_id_(), - fwd_idx_tablet_id_(), - doc_id_idx_tablet_id_() {} - virtual ~ObDASIRParam() {} - inline bool is_ir_scan() const { return false; } // always false on master - - TO_STRING_KV(K_(ls_id), K_(inv_idx_tablet_id), K_(fwd_idx_tablet_id), - K_(doc_id_idx_tablet_id), KP_(ctdef), KP_(rtdef)); - ObIAllocator &allocator_; - ObDASIRCtDef *ctdef_; - ObDASIRRtDef *rtdef_; - share::ObLSID ls_id_; - ObTabletID inv_idx_tablet_id_; - ObTabletID fwd_idx_tablet_id_; - ObTabletID doc_id_idx_tablet_id_; + ObDASObsoletedObj() : flag_(false) {} + TO_STRING_KV(K_(flag)); + bool flag_; }; class ObDASScanOp : public ObIDASTaskOp @@ -284,7 +267,7 @@ protected: union { common::ObArenaAllocator retry_alloc_buf_; }; - ObDASIRParam ir_param_; + ObDASObsoletedObj ir_param_; // obsoleted attribute, please gc me at next barrier version }; class ObDASScanResult : public ObIDASTaskResult, public common::ObNewRowIterator diff --git a/src/sql/das/ob_das_task.cpp b/src/sql/das/ob_das_task.cpp index 520158666..4d63091c0 100644 --- a/src/sql/das/ob_das_task.cpp +++ b/src/sql/das/ob_das_task.cpp @@ -83,6 +83,10 @@ OB_DEF_SERIALIZE(ObDASRemoteInfo) OB_UNIS_ENCODE(session_id_); OB_UNIS_ENCODE(plan_id_); OB_UNIS_ENCODE(plan_hash_); + + // placeholder for serialize the reference relationship between ctdefs and rtdefs. + // Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1. + // double check compatiblity before merge to master return ret; } @@ -177,6 +181,10 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo) OB_UNIS_DECODE(session_id_); OB_UNIS_DECODE(plan_id_); OB_UNIS_DECODE(plan_hash_); + + // placeholder for serialize the reference relationship between ctdefs and rtdefs. + // Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1. + // double check compatiblity before merge to master return ret; } @@ -216,6 +224,10 @@ OB_DEF_SERIALIZE_SIZE(ObDASRemoteInfo) OB_UNIS_ADD_LEN(session_id_); OB_UNIS_ADD_LEN(plan_id_); OB_UNIS_ADD_LEN(plan_hash_); + + // placeholder for serialize the reference relationship between ctdefs and rtdefs. + // Full logic here requires some complicated data structure refactor on fts branch for ver 4.3.1. + // double check compatiblity before merge to master return len; } @@ -277,7 +289,9 @@ OB_SERIALIZE_MEMBER(ObIDASTaskOp, ls_id_, related_ctdefs_, related_rtdefs_, - related_tablet_ids_); + related_tablet_ids_, + attach_ctdef_, + attach_rtdef_); ObDASTaskArg::ObDASTaskArg() : timeout_ts_(0), diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index 64ff46b94..de1bf4f03 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -122,7 +122,9 @@ public: das_task_node_(), agg_tasks_(nullptr), cur_agg_list_(nullptr), - op_result_(nullptr) + op_result_(nullptr), + attach_ctdef_(nullptr), + attach_rtdef_(nullptr) { das_task_node_.get_data() = this; } @@ -264,6 +266,11 @@ protected: ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize ObIDASTaskResult *op_result_; + //The attach_ctdef describes the computations that are pushed down and executed as an attachment to the ObDASTaskOp, + //such as the back table operation for full-text indexes, + //rowkey merging for index merge operations, and so on. + const ObDASBaseCtDef *attach_ctdef_; + ObDASBaseRtDef *attach_rtdef_; }; typedef common::ObObjStore DasTaskList; typedef DasTaskList::Iterator DASTaskIter; @@ -412,17 +419,22 @@ struct DASCtEncoder static int encode(char *buf, const int64_t buf_len, int64_t &pos, const T *val) { int ret = common::OB_SUCCESS; - int64_t idx = 0; + int64_t idx = common::OB_INVALID_INDEX; const ObDASBaseCtDef *ctdef = val; ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info(); - if (OB_ISNULL(val) || OB_ISNULL(remote_info)) { + if (OB_ISNULL(remote_info)) { ret = common::OB_ERR_UNEXPECTED; - SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(val), K(remote_info)); + SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(remote_info)); + } else if (OB_ISNULL(val)) { + idx = common::OB_INVALID_INDEX; } else if (!common::has_exist_in_array(remote_info->ctdefs_, ctdef, &idx)) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "val not found in ctdefs", K(ret), K(val), KPC(val)); - } else if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast(idx)))) { - SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx)); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast(idx)))) { + SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx)); + } } return ret; } @@ -430,13 +442,15 @@ struct DASCtEncoder static int decode(const char *buf, const int64_t data_len, int64_t &pos, const T *&val) { int ret = common::OB_SUCCESS; - int32_t idx = 0; + int32_t idx = common::OB_INVALID_INDEX; ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info(); if (OB_ISNULL(remote_info)) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "remote_info is nullptr", K(ret), K(remote_info)); } else if (OB_FAIL(common::serialization::decode_i32(buf, data_len, pos, &idx))) { SQL_DAS_LOG(WARN, "decode idx failed", K(ret), K(idx)); + } else if (OB_UNLIKELY(common::OB_INVALID_INDEX == idx)) { + val = nullptr; } else if (OB_UNLIKELY(idx < 0) || OB_UNLIKELY(idx >= remote_info->ctdefs_.count())) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "idx is invalid", K(ret), K(idx), K(remote_info->ctdefs_.count())); @@ -449,7 +463,7 @@ struct DASCtEncoder static int64_t encoded_length(const T *val) { UNUSED(val); - int32_t idx = 0; + int32_t idx = common::OB_INVALID_INDEX; return common::serialization::encoded_length_i32(idx); } }; @@ -460,17 +474,22 @@ struct DASRtEncoder static int encode(char *buf, const int64_t buf_len, int64_t &pos, const T *val) { int ret = common::OB_SUCCESS; - int64_t idx = 0; + int64_t idx = common::OB_INVALID_INDEX; ObDASBaseRtDef *rtdef = const_cast(val); ObDASRemoteInfo *remote_info = ObDASRemoteInfo::get_remote_info(); - if (OB_ISNULL(val) || OB_ISNULL(remote_info)) { + if (OB_ISNULL(remote_info)) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "val is nullptr", K(ret), K(val), K(remote_info)); + } else if (OB_ISNULL(val)) { + idx = common::OB_INVALID_INDEX; } else if (!common::has_exist_in_array(remote_info->rtdefs_, rtdef, &idx)) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "val not found in rtdefs", K(ret), K(val), KPC(val)); - } else if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast(idx)))) { - SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx)); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(common::serialization::encode_i32(buf, buf_len, pos, static_cast(idx)))) { + SQL_DAS_LOG(WARN, "encode idx failed", K(ret), K(idx)); + } } return ret; } @@ -485,6 +504,8 @@ struct DASRtEncoder SQL_DAS_LOG(WARN, "remote_info is nullptr", K(ret), K(remote_info)); } else if (OB_FAIL(common::serialization::decode_i32(buf, data_len, pos, &idx))) { SQL_DAS_LOG(WARN, "decode idx failed", K(ret), K(idx)); + } else if (OB_UNLIKELY(common::OB_INVALID_INDEX == idx)) { + val = nullptr; } else if (OB_UNLIKELY(idx < 0) || OB_UNLIKELY(idx >= remote_info->rtdefs_.count())) { ret = common::OB_ERR_UNEXPECTED; SQL_DAS_LOG(WARN, "idx is invalid", K(ret), K(idx), K(remote_info->rtdefs_.count())); diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index 74552e784..135d8067b 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -130,18 +130,13 @@ OB_DEF_SERIALIZE(ObTableScanCtDef) OB_UNIS_ENCODE(calc_part_id_expr_); OB_UNIS_ENCODE(global_index_rowkey_exprs_); OB_UNIS_ENCODE(flashback_item_.fq_read_tx_uncommitted_); - bool has_aux_lookup = false; - OB_UNIS_ENCODE(has_aux_lookup); - if (OB_SUCC(ret) && has_aux_lookup) { - OB_UNIS_ENCODE(*aux_lookup_ctdef_); - OB_UNIS_ENCODE(*aux_lookup_loc_meta_); - } + // abandoned fields, please remove me at next barrier version + bool abandoned_always_false_aux_lookup = false; + bool abandoned_always_false_text_ir = false; + OB_UNIS_ENCODE(abandoned_always_false_aux_lookup); + OB_UNIS_ENCODE(abandoned_always_false_text_ir); - bool has_text_ir = false; - OB_UNIS_ENCODE(has_text_ir); - if (OB_SUCC(ret) && has_text_ir) { - OB_UNIS_ENCODE(*text_ir_ctdef_); - } + OB_UNIS_ENCODE(attach_spec_); return ret; } @@ -169,17 +164,13 @@ OB_DEF_SERIALIZE_SIZE(ObTableScanCtDef) OB_UNIS_ADD_LEN(calc_part_id_expr_); OB_UNIS_ADD_LEN(global_index_rowkey_exprs_); OB_UNIS_ADD_LEN(flashback_item_.fq_read_tx_uncommitted_); - bool has_aux_lookup = false; - OB_UNIS_ADD_LEN(has_aux_lookup); - if (has_aux_lookup) { - OB_UNIS_ADD_LEN(*aux_lookup_ctdef_); - OB_UNIS_ADD_LEN(*aux_lookup_loc_meta_); - } - bool has_text_ir = false; - OB_UNIS_ADD_LEN(has_text_ir); - if (has_text_ir) { - OB_UNIS_ADD_LEN(*text_ir_ctdef_); - } + // abandoned fields, please remove me at next barrier version + bool abandoned_always_false_aux_lookup = false; + bool abandoned_always_false_text_ir = false; + OB_UNIS_ADD_LEN(abandoned_always_false_aux_lookup); + OB_UNIS_ADD_LEN(abandoned_always_false_text_ir); + + OB_UNIS_ADD_LEN(attach_spec_); return len; } @@ -224,30 +215,13 @@ OB_DEF_DESERIALIZE(ObTableScanCtDef) OB_UNIS_DECODE(calc_part_id_expr_); OB_UNIS_DECODE(global_index_rowkey_exprs_); OB_UNIS_DECODE(flashback_item_.fq_read_tx_uncommitted_); - bool has_aux_lookup = false; - OB_UNIS_DECODE(has_aux_lookup); - if (OB_SUCC(ret) && has_aux_lookup) { - if (OB_ISNULL(aux_lookup_ctdef_ = OB_NEWx(ObDASScanCtDef, &allocator_, allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory for aux lookup ctdef", K(ret)); - } else if (OB_ISNULL(aux_lookup_loc_meta_ = OB_NEWx(ObDASTableLocMeta, &allocator_, allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory for aux lookup table location meta", K(ret)); - } else { - OB_UNIS_DECODE(*aux_lookup_ctdef_); - OB_UNIS_DECODE(*aux_lookup_loc_meta_); - } - } - bool has_text_ir = false; - OB_UNIS_DECODE(has_text_ir); - if (OB_SUCC(ret) && has_text_ir) { - if (OB_ISNULL(text_ir_ctdef_ = OB_NEWx(ObDASIRCtDef, &allocator_, allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory for text ir ctdef", K(ret)); - } else { - OB_UNIS_DECODE(*text_ir_ctdef_); - } - } + // abandoned fields, please remove me at next barrier version + bool abandoned_always_false_aux_lookup = false; + bool abandoned_always_false_text_ir = false; + OB_UNIS_DECODE(abandoned_always_false_aux_lookup); + OB_UNIS_DECODE(abandoned_always_false_text_ir); + + OB_UNIS_DECODE(attach_spec_); return ret; } diff --git a/src/sql/engine/table/ob_table_scan_op.h b/src/sql/engine/table/ob_table_scan_op.h index 6707fa034..f824583f2 100644 --- a/src/sql/engine/table/ob_table_scan_op.h +++ b/src/sql/engine/table/ob_table_scan_op.h @@ -24,7 +24,7 @@ #include "sql/das/ob_das_ref.h" #include "sql/das/ob_data_access_service.h" #include "sql/das/ob_das_scan_op.h" -#include "sql/das/ob_text_retrieval_op.h" +#include "sql/das/ob_das_attach_define.h" #include "sql/engine/basic/ob_pushdown_filter.h" #include "sql/engine/table/ob_index_lookup_op_impl.h" namespace oceanbase @@ -141,9 +141,7 @@ public: allocator_(allocator), calc_part_id_expr_(NULL), global_index_rowkey_exprs_(allocator), - aux_lookup_ctdef_(nullptr), - aux_lookup_loc_meta_(nullptr), - text_ir_ctdef_(nullptr) + attach_spec_(allocator_, &scan_ctdef_) { } const ExprFixedArray &get_das_output_exprs() const { @@ -165,10 +163,7 @@ public: KPC_(lookup_loc_meta), KPC_(das_dppr_tbl), KPC_(calc_part_id_expr), - K_(global_index_rowkey_exprs), - KPC_(aux_lookup_ctdef), - KPC_(aux_lookup_loc_meta), - KPC_(text_ir_ctdef)); + K_(global_index_rowkey_exprs)); //the query range of index scan/table scan ObQueryRange pre_query_range_; FlashBackItem flashback_item_; @@ -195,11 +190,7 @@ public: ObExpr *calc_part_id_expr_; ExprFixedArray global_index_rowkey_exprs_; // end for Global Index Lookup - // domain doc_id aux lookup - ObDASScanCtDef *aux_lookup_ctdef_; - ObDASTableLocMeta *aux_lookup_loc_meta_; - // text retrieval - ObDASIRCtDef *text_ir_ctdef_; + ObDASAttachSpec attach_spec_; }; struct ObTableScanRtDef