diff --git a/deps/oblib/src/common/object/ob_object.cpp b/deps/oblib/src/common/object/ob_object.cpp index b875052b7f..a57dcecebd 100644 --- a/deps/oblib/src/common/object/ob_object.cpp +++ b/deps/oblib/src/common/object/ob_object.cpp @@ -299,6 +299,13 @@ DEF_TO_STRING(ObLobLocatorV2) J_KV(K(*location_info)); J_COMMA(); } + if (buf_len > pos && extern_header->flags_.has_retry_info_ + && size_ >= offset + MEM_LOB_EXTERN_RETRYINFO_LEN) { + ObMemLobRetryInfo *retry_info = reinterpret_cast(ptr_ + offset); + offset += MEM_LOB_EXTERN_RETRYINFO_LEN; + J_KV(K(*retry_info)); + J_COMMA(); + } if (buf_len > pos) { ObString rowkey_str(MIN(extern_header->rowkey_size_, buf_len - pos), ptr_ + offset); offset += extern_header->rowkey_size_; @@ -346,6 +353,9 @@ uint32_t ObLobLocatorV2::calc_locator_full_len(const ObMemLobExternFlags &flags, if (flags.has_location_info_) { loc_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN; } + if (flags.has_retry_info_) { + loc_len += MEM_LOB_EXTERN_RETRYINFO_LEN; + } loc_len += MEM_LOB_ADDR_LEN; //ToDo:@gehao server address. loc_len += rowkey_size; } @@ -426,6 +436,10 @@ int ObLobLocatorV2::fill(ObMemLobType type, offset += MEM_LOB_EXTERN_LOCATIONINFO_LEN; *extern_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN; } + if (flags.has_retry_info_) { + offset += MEM_LOB_EXTERN_RETRYINFO_LEN; + *extern_len += MEM_LOB_EXTERN_RETRYINFO_LEN; + } if ((offset + rowkey_str.length()) && OB_UNLIKELY(offset > size_)) { ret = OB_BUF_NOT_ENOUGH; @@ -749,6 +763,28 @@ int ObLobLocatorV2::get_location_info(ObMemLobLocationInfo *&location_info) cons return ret; } +int ObLobLocatorV2::get_retry_info(ObMemLobRetryInfo *&retry_info) const +{ + int ret = OB_SUCCESS; + ObMemLobExternHeader *extern_header = NULL; + if (OB_SUCC(get_extern_header(extern_header))) { + char *cur_pos = extern_header->data_ + MEM_LOB_EXTERN_SIZE_LEN; + if (extern_header->flags_.has_tx_info_) { + cur_pos += MEM_LOB_EXTERN_TXINFO_LEN; + } + if (extern_header->flags_.has_location_info_) { + cur_pos += MEM_LOB_EXTERN_LOCATIONINFO_LEN; + } + if (extern_header->flags_.has_retry_info_) { + retry_info = reinterpret_cast(cur_pos); + } else { + ret = OB_ERR_NULL_VALUE; + COMMON_LOG(WARN, "Lob: does not have retry info", K(this), K(ret)); + } + } + return ret; +} + int ObLobLocatorV2::get_real_locator_len(int64_t &real_len) const { int ret = OB_SUCCESS; @@ -883,6 +919,17 @@ int ObLobLocatorV2::set_location_info(const ObMemLobLocationInfo &location_info) return ret; } +int ObLobLocatorV2::set_retry_info(const ObMemLobRetryInfo &retry_info) +{ + validate_has_lob_header(has_lob_header_); + int ret = OB_SUCCESS; + ObMemLobRetryInfo *retry_info_ptr = NULL; + if (OB_SUCC(get_retry_info(retry_info_ptr))) { + *retry_info_ptr = retry_info; + } + return ret; +} + OB_DEF_SERIALIZE(ObLobLocatorV2) { int ret = OB_SUCCESS; diff --git a/deps/oblib/src/common/object/ob_object.h b/deps/oblib/src/common/object/ob_object.h index 203fd535d3..2377d04b65 100644 --- a/deps/oblib/src/common/object/ob_object.h +++ b/deps/oblib/src/common/object/ob_object.h @@ -781,11 +781,11 @@ struct ObMemLobCommon struct ObMemLobExternFlags { ObMemLobExternFlags() : - has_tx_info_(1), has_location_info_(1), reserved_(0) + has_tx_info_(1), has_location_info_(1), has_retry_info_(1), reserved_(0) {} ObMemLobExternFlags(bool enable) : - has_tx_info_(enable), has_location_info_(enable), reserved_(0) + has_tx_info_(enable), has_location_info_(enable), has_retry_info_(enable), reserved_(0) {} ObMemLobExternFlags(const ObMemLobExternFlags &flags) { *this = flags; } @@ -800,11 +800,12 @@ struct ObMemLobExternFlags return (*(reinterpret_cast(this)) = 0); } - TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(reserved)); + TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(has_retry_info), K_(reserved)); uint16_t has_tx_info_ : 1; // Indicate whether tx info exists uint16_t has_location_info_ : 1; // Indicate whether has cid exists (reserved) - uint16_t reserved_ : 14; + uint16_t has_retry_info_ : 1; // Indicate whether has retry info exists + uint16_t reserved_ : 13; }; // Memory Locator V2, Extern Header: @@ -873,6 +874,16 @@ struct ObMemLobLocationInfo char data_[0]; }; +struct ObMemLobRetryInfo +{ + ObMemLobRetryInfo() : is_select_leader_(true), read_latest_(false), addr_(), timeout_(0) {} + TO_STRING_KV(K_(is_select_leader), K_(read_latest), K_(addr), K_(timeout)); + bool is_select_leader_; + bool read_latest_; + ObAddr addr_; + uint64_t timeout_; +}; + OB_INLINE void validate_has_lob_header(const bool &has_header) { #ifdef VALIDATE_LOB_HEADER @@ -897,6 +908,7 @@ public: static const uint32_t MEM_LOB_EXTERN_HEADER_LEN = sizeof(ObMemLobExternHeader); static const uint32_t MEM_LOB_EXTERN_TXINFO_LEN = sizeof(ObMemLobTxInfo); static const uint32_t MEM_LOB_EXTERN_LOCATIONINFO_LEN = sizeof(ObMemLobLocationInfo); + static const uint32_t MEM_LOB_EXTERN_RETRYINFO_LEN = sizeof(ObMemLobRetryInfo); static const uint16_t MEM_LOB_EXTERN_SIZE_LEN = sizeof(uint16_t); static const uint32_t MEM_LOB_ADDR_LEN = 0; // reserved for temp lob address @@ -1008,6 +1020,7 @@ public: int set_payload_data(const ObLobCommon *lob_comm, const ObString& payload); int set_tx_info(const ObMemLobTxInfo &tx_info); int set_location_info(const ObMemLobLocationInfo &location_info); + int set_retry_info(const ObMemLobRetryInfo &retry_info); // interfaces for read // Notice: all the following functions should be called after is_valid() or fill() @@ -1022,6 +1035,7 @@ public: int get_table_info(uint64_t &table_id, uint32_t &column_idex); int get_tx_info(ObMemLobTxInfo *&tx_info) const; int get_location_info(ObMemLobLocationInfo *&location_info) const; + int get_retry_info(ObMemLobRetryInfo *&retry_info) const; int get_real_locator_len(int64_t &real_len) const; bool is_empty_lob() const; diff --git a/src/storage/lob/ob_lob_locator.cpp b/src/storage/lob/ob_lob_locator.cpp index 47b430ae97..946a964b65 100644 --- a/src/storage/lob/ob_lob_locator.cpp +++ b/src/storage/lob/ob_lob_locator.cpp @@ -19,6 +19,7 @@ #include "storage/tx/ob_trans_define_v4.h" #include "storage/tx/ob_trans_service.h" #include "share/ob_lob_access_utils.h" +#include "observer/ob_server.h" namespace oceanbase { @@ -38,7 +39,8 @@ ObLobLocatorHelper::ObLobLocatorHelper() locator_allocator_(ObModIds::OB_LOB_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), rowkey_str_(), enable_locator_v2_(), - is_inited_(false) + is_inited_(false), + scan_flag_() { } @@ -96,6 +98,7 @@ int ObLobLocatorHelper::init(const ObTableScanParam &scan_param, ls_id_ = ls_id.id(); read_snapshot_ = ctx.mvcc_acc_ctx_.snapshot_; enable_locator_v2_ = table_param.enable_lob_locator_v2(); + scan_flag_ = scan_param.scan_flag_; if (snapshot_version != read_snapshot_.version_.get_val_for_tx()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "snapshot version mismatch", @@ -310,6 +313,7 @@ int ObLobLocatorHelper::fuse_mem_lob_header(ObObj &def_obj, uint64_t col_id, boo // mysql inrow lobs & systable lobs do not have extern fields bool has_extern = (lib::is_oracle_mode() && !is_systable); ObMemLobExternFlags extern_flags(has_extern); + extern_flags.has_retry_info_ = 0; // default obj should only be inrow, no need retry info ObLobCommon lob_common; int64_t full_loc_size = ObLobLocatorV2::calc_locator_full_len(extern_flags, rowkey_str_.length(), @@ -544,6 +548,11 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator, ObMemLobTxInfo tx_info(read_snapshot_.version_.get_val_for_tx(), read_snapshot_.tx_id_.get_id(), read_snapshot_.scn_.cast_to_int()); + ObMemLobRetryInfo retry_info; + retry_info.addr_ = MYADDR; + retry_info.is_select_leader_ = true; + retry_info.read_latest_ = scan_flag_.read_latest_; + retry_info.timeout_ = access_ctx.timeout_; ObMemLobLocationInfo location_info(tablet_id_, ls_id_, cs_type); if (has_extern && OB_FAIL(locator.set_table_info(table_id_, column_id))) { // should be column idx STORAGE_LOG(WARN, "Lob: set table info failed", K(ret), K(table_id_), K(column_id)); @@ -551,6 +560,8 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator, STORAGE_LOG(WARN, "Lob: set transaction info failed", K(ret), K(tx_info)); } else if (extern_flags.has_location_info_ && OB_FAIL(locator.set_location_info(location_info))) { STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(location_info)); + } else if (extern_flags.has_retry_info_ && OB_FAIL(locator.set_retry_info(retry_info))) { + STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(retry_info)); } } diff --git a/src/storage/lob/ob_lob_locator.h b/src/storage/lob/ob_lob_locator.h index 0092f8513a..e802a8f4d8 100644 --- a/src/storage/lob/ob_lob_locator.h +++ b/src/storage/lob/ob_lob_locator.h @@ -93,6 +93,7 @@ private: ObString rowkey_str_; // for default values bool enable_locator_v2_; bool is_inited_; + ObQueryFlag scan_flag_; }; } // namespace storage diff --git a/src/storage/lob/ob_lob_manager.cpp b/src/storage/lob/ob_lob_manager.cpp index 2f69ee21e7..1599f2e1ec 100644 --- a/src/storage/lob/ob_lob_manager.cpp +++ b/src/storage/lob/ob_lob_manager.cpp @@ -3554,10 +3554,16 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param, if (OB_SUCC(ret) && lob.is_persist_lob() && !lob.has_inrow_data()) { ObMemLobTxInfo *tx_info = nullptr; ObMemLobLocationInfo *location_info = nullptr; + ObMemLobRetryInfo *retry_info = nullptr; + ObMemLobExternHeader *extern_header = NULL; if (OB_FAIL(lob.get_tx_info(tx_info))) { LOG_WARN("failed to get tx info", K(ret), K(lob)); } else if (OB_FAIL(lob.get_location_info(location_info))) { LOG_WARN("failed to get location info", K(ret), K(lob)); + } else if (OB_FAIL(lob.get_extern_header(extern_header))) { + LOG_WARN("failed to get extern header", K(ret), K(lob)); + } else if (extern_header->flags_.has_retry_info_ && OB_FAIL(lob.get_retry_info(retry_info))) { + LOG_WARN("failed to get retry info", K(ret), K(lob)); } else { auto snapshot_tx_seq = transaction::ObTxSEQ::cast_from_int(tx_info->snapshot_seq_); if (OB_ISNULL(param.tx_desc_) || @@ -3569,6 +3575,7 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param, param.snapshot_.valid_ = true; param.snapshot_.source_ = transaction::ObTxReadSnapshot::SRC::LS; param.snapshot_.snapshot_lsid_ = share::ObLSID(location_info->ls_id_); + param.read_latest_ = retry_info->read_latest_; } else { // When param for write, param.tx_desc_ should not be null // If tx indfo from lob locator is old, produce new read snapshot directly diff --git a/src/storage/lob/ob_lob_persistent_adaptor.cpp b/src/storage/lob/ob_lob_persistent_adaptor.cpp index e24bafaf98..757ae1cf23 100644 --- a/src/storage/lob/ob_lob_persistent_adaptor.cpp +++ b/src/storage/lob/ob_lob_persistent_adaptor.cpp @@ -936,7 +936,7 @@ int ObPersistentLobApator::build_common_scan_param( false, // index_back false, // query_stat ObQueryFlag::MysqlMode, // sql_mode - false // read_latest + param.read_latest_ // read_latest ); query_flag.disable_cache(); query_flag.scan_order_ = param.scan_backward_ ? ObQueryFlag::Reverse : ObQueryFlag::Forward; @@ -961,6 +961,9 @@ int ObPersistentLobApator::build_common_scan_param( scan_param.limit_param_.offset_ = 0; // sessions scan_param.snapshot_ = param.snapshot_; + if(param.read_latest_) { + scan_param.tx_id_ = param.snapshot_.core_.tx_id_; + } scan_param.sql_mode_ = param.sql_mode_; // common set scan_param.allocator_ = param.allocator_; diff --git a/src/storage/lob/ob_lob_util.h b/src/storage/lob/ob_lob_util.h index 783c91bb09..053e6cc3cb 100644 --- a/src/storage/lob/ob_lob_util.h +++ b/src/storage/lob/ob_lob_util.h @@ -44,7 +44,8 @@ struct ObLobStorageParam struct ObLobAccessParam { ObLobAccessParam() - : tx_desc_(nullptr), snapshot_(), tx_id_(), sql_mode_(SMO_DEFAULT), allocator_(nullptr), + : tx_desc_(nullptr), snapshot_(), tx_id_(), read_latest_(0), + sql_mode_(SMO_DEFAULT), allocator_(nullptr), dml_base_param_(nullptr), column_ids_(), meta_table_schema_(nullptr), piece_table_schema_(nullptr), main_tablet_param_(nullptr), meta_tablet_param_(nullptr), piece_tablet_param_(nullptr), @@ -68,12 +69,13 @@ public: TO_STRING_KV(K_(tenant_id), K_(src_tenant_id), K_(ls_id), K_(tablet_id), KPC_(lob_locator), KPC_(lob_common), KPC_(lob_data), K_(byte_size), K_(handle_size), K_(coll_type), K_(scan_backward), K_(offset), K_(len), K_(parent_seq_no), K_(seq_no_st), K_(used_seq_cnt), K_(total_seq_cnt), K_(checksum), - K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(inrow_read_nocopy), - K_(inrow_threshold), K_(spec_lob_id)); + K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(read_latest), + K_(inrow_read_nocopy), K_(inrow_threshold), K_(spec_lob_id)); public: transaction::ObTxDesc *tx_desc_; // for write/update/delete transaction::ObTxReadSnapshot snapshot_; // for read - transaction::ObTransID tx_id_; // used when read-latest + transaction::ObTransID tx_id_; // used when read-latest + bool read_latest_; ObSQLMode sql_mode_; bool is_total_quantity_log_; ObIAllocator *allocator_;