[BUGFIX] fix repeat update lob in one stmt wrong
This commit is contained in:
47
deps/oblib/src/common/object/ob_object.cpp
vendored
47
deps/oblib/src/common/object/ob_object.cpp
vendored
@ -299,6 +299,13 @@ DEF_TO_STRING(ObLobLocatorV2)
|
||||
J_KV(K(*location_info));
|
||||
J_COMMA();
|
||||
}
|
||||
if (buf_len > pos && extern_header->flags_.has_retry_info_
|
||||
&& size_ >= offset + MEM_LOB_EXTERN_RETRYINFO_LEN) {
|
||||
ObMemLobRetryInfo *retry_info = reinterpret_cast<ObMemLobRetryInfo *>(ptr_ + offset);
|
||||
offset += MEM_LOB_EXTERN_RETRYINFO_LEN;
|
||||
J_KV(K(*retry_info));
|
||||
J_COMMA();
|
||||
}
|
||||
if (buf_len > pos) {
|
||||
ObString rowkey_str(MIN(extern_header->rowkey_size_, buf_len - pos), ptr_ + offset);
|
||||
offset += extern_header->rowkey_size_;
|
||||
@ -346,6 +353,9 @@ uint32_t ObLobLocatorV2::calc_locator_full_len(const ObMemLobExternFlags &flags,
|
||||
if (flags.has_location_info_) {
|
||||
loc_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
|
||||
}
|
||||
if (flags.has_retry_info_) {
|
||||
loc_len += MEM_LOB_EXTERN_RETRYINFO_LEN;
|
||||
}
|
||||
loc_len += MEM_LOB_ADDR_LEN; //ToDo:@gehao server address.
|
||||
loc_len += rowkey_size;
|
||||
}
|
||||
@ -426,6 +436,10 @@ int ObLobLocatorV2::fill(ObMemLobType type,
|
||||
offset += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
|
||||
*extern_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
|
||||
}
|
||||
if (flags.has_retry_info_) {
|
||||
offset += MEM_LOB_EXTERN_RETRYINFO_LEN;
|
||||
*extern_len += MEM_LOB_EXTERN_RETRYINFO_LEN;
|
||||
}
|
||||
|
||||
if ((offset + rowkey_str.length()) && OB_UNLIKELY(offset > size_)) {
|
||||
ret = OB_BUF_NOT_ENOUGH;
|
||||
@ -749,6 +763,28 @@ int ObLobLocatorV2::get_location_info(ObMemLobLocationInfo *&location_info) cons
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobLocatorV2::get_retry_info(ObMemLobRetryInfo *&retry_info) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMemLobExternHeader *extern_header = NULL;
|
||||
if (OB_SUCC(get_extern_header(extern_header))) {
|
||||
char *cur_pos = extern_header->data_ + MEM_LOB_EXTERN_SIZE_LEN;
|
||||
if (extern_header->flags_.has_tx_info_) {
|
||||
cur_pos += MEM_LOB_EXTERN_TXINFO_LEN;
|
||||
}
|
||||
if (extern_header->flags_.has_location_info_) {
|
||||
cur_pos += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
|
||||
}
|
||||
if (extern_header->flags_.has_retry_info_) {
|
||||
retry_info = reinterpret_cast<ObMemLobRetryInfo *>(cur_pos);
|
||||
} else {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
COMMON_LOG(WARN, "Lob: does not have retry info", K(this), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobLocatorV2::get_real_locator_len(int64_t &real_len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -883,6 +919,17 @@ int ObLobLocatorV2::set_location_info(const ObMemLobLocationInfo &location_info)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobLocatorV2::set_retry_info(const ObMemLobRetryInfo &retry_info)
|
||||
{
|
||||
validate_has_lob_header(has_lob_header_);
|
||||
int ret = OB_SUCCESS;
|
||||
ObMemLobRetryInfo *retry_info_ptr = NULL;
|
||||
if (OB_SUCC(get_retry_info(retry_info_ptr))) {
|
||||
*retry_info_ptr = retry_info;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE(ObLobLocatorV2)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
22
deps/oblib/src/common/object/ob_object.h
vendored
22
deps/oblib/src/common/object/ob_object.h
vendored
@ -781,11 +781,11 @@ struct ObMemLobCommon
|
||||
struct ObMemLobExternFlags
|
||||
{
|
||||
ObMemLobExternFlags() :
|
||||
has_tx_info_(1), has_location_info_(1), reserved_(0)
|
||||
has_tx_info_(1), has_location_info_(1), has_retry_info_(1), reserved_(0)
|
||||
{}
|
||||
|
||||
ObMemLobExternFlags(bool enable) :
|
||||
has_tx_info_(enable), has_location_info_(enable), reserved_(0)
|
||||
has_tx_info_(enable), has_location_info_(enable), has_retry_info_(enable), reserved_(0)
|
||||
{}
|
||||
|
||||
ObMemLobExternFlags(const ObMemLobExternFlags &flags) { *this = flags; }
|
||||
@ -800,11 +800,12 @@ struct ObMemLobExternFlags
|
||||
return (*(reinterpret_cast<uint16_t *>(this)) = 0);
|
||||
}
|
||||
|
||||
TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(reserved));
|
||||
TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(has_retry_info), K_(reserved));
|
||||
|
||||
uint16_t has_tx_info_ : 1; // Indicate whether tx info exists
|
||||
uint16_t has_location_info_ : 1; // Indicate whether has cid exists (reserved)
|
||||
uint16_t reserved_ : 14;
|
||||
uint16_t has_retry_info_ : 1; // Indicate whether has retry info exists
|
||||
uint16_t reserved_ : 13;
|
||||
};
|
||||
|
||||
// Memory Locator V2, Extern Header:
|
||||
@ -873,6 +874,16 @@ struct ObMemLobLocationInfo
|
||||
char data_[0];
|
||||
};
|
||||
|
||||
struct ObMemLobRetryInfo
|
||||
{
|
||||
ObMemLobRetryInfo() : is_select_leader_(true), read_latest_(false), addr_(), timeout_(0) {}
|
||||
TO_STRING_KV(K_(is_select_leader), K_(read_latest), K_(addr), K_(timeout));
|
||||
bool is_select_leader_;
|
||||
bool read_latest_;
|
||||
ObAddr addr_;
|
||||
uint64_t timeout_;
|
||||
};
|
||||
|
||||
OB_INLINE void validate_has_lob_header(const bool &has_header)
|
||||
{
|
||||
#ifdef VALIDATE_LOB_HEADER
|
||||
@ -897,6 +908,7 @@ public:
|
||||
static const uint32_t MEM_LOB_EXTERN_HEADER_LEN = sizeof(ObMemLobExternHeader);
|
||||
static const uint32_t MEM_LOB_EXTERN_TXINFO_LEN = sizeof(ObMemLobTxInfo);
|
||||
static const uint32_t MEM_LOB_EXTERN_LOCATIONINFO_LEN = sizeof(ObMemLobLocationInfo);
|
||||
static const uint32_t MEM_LOB_EXTERN_RETRYINFO_LEN = sizeof(ObMemLobRetryInfo);
|
||||
static const uint16_t MEM_LOB_EXTERN_SIZE_LEN = sizeof(uint16_t);
|
||||
static const uint32_t MEM_LOB_ADDR_LEN = 0; // reserved for temp lob address
|
||||
|
||||
@ -1008,6 +1020,7 @@ public:
|
||||
int set_payload_data(const ObLobCommon *lob_comm, const ObString& payload);
|
||||
int set_tx_info(const ObMemLobTxInfo &tx_info);
|
||||
int set_location_info(const ObMemLobLocationInfo &location_info);
|
||||
int set_retry_info(const ObMemLobRetryInfo &retry_info);
|
||||
|
||||
// interfaces for read
|
||||
// Notice: all the following functions should be called after is_valid() or fill()
|
||||
@ -1022,6 +1035,7 @@ public:
|
||||
int get_table_info(uint64_t &table_id, uint32_t &column_idex);
|
||||
int get_tx_info(ObMemLobTxInfo *&tx_info) const;
|
||||
int get_location_info(ObMemLobLocationInfo *&location_info) const;
|
||||
int get_retry_info(ObMemLobRetryInfo *&retry_info) const;
|
||||
int get_real_locator_len(int64_t &real_len) const;
|
||||
|
||||
bool is_empty_lob() const;
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "storage/tx/ob_trans_define_v4.h"
|
||||
#include "storage/tx/ob_trans_service.h"
|
||||
#include "share/ob_lob_access_utils.h"
|
||||
#include "observer/ob_server.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -38,7 +39,8 @@ ObLobLocatorHelper::ObLobLocatorHelper()
|
||||
locator_allocator_(ObModIds::OB_LOB_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
rowkey_str_(),
|
||||
enable_locator_v2_(),
|
||||
is_inited_(false)
|
||||
is_inited_(false),
|
||||
scan_flag_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -96,6 +98,7 @@ int ObLobLocatorHelper::init(const ObTableScanParam &scan_param,
|
||||
ls_id_ = ls_id.id();
|
||||
read_snapshot_ = ctx.mvcc_acc_ctx_.snapshot_;
|
||||
enable_locator_v2_ = table_param.enable_lob_locator_v2();
|
||||
scan_flag_ = scan_param.scan_flag_;
|
||||
if (snapshot_version != read_snapshot_.version_.get_val_for_tx()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "snapshot version mismatch",
|
||||
@ -310,6 +313,7 @@ int ObLobLocatorHelper::fuse_mem_lob_header(ObObj &def_obj, uint64_t col_id, boo
|
||||
// mysql inrow lobs & systable lobs do not have extern fields
|
||||
bool has_extern = (lib::is_oracle_mode() && !is_systable);
|
||||
ObMemLobExternFlags extern_flags(has_extern);
|
||||
extern_flags.has_retry_info_ = 0; // default obj should only be inrow, no need retry info
|
||||
ObLobCommon lob_common;
|
||||
int64_t full_loc_size = ObLobLocatorV2::calc_locator_full_len(extern_flags,
|
||||
rowkey_str_.length(),
|
||||
@ -544,6 +548,11 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator,
|
||||
ObMemLobTxInfo tx_info(read_snapshot_.version_.get_val_for_tx(),
|
||||
read_snapshot_.tx_id_.get_id(),
|
||||
read_snapshot_.scn_.cast_to_int());
|
||||
ObMemLobRetryInfo retry_info;
|
||||
retry_info.addr_ = MYADDR;
|
||||
retry_info.is_select_leader_ = true;
|
||||
retry_info.read_latest_ = scan_flag_.read_latest_;
|
||||
retry_info.timeout_ = access_ctx.timeout_;
|
||||
ObMemLobLocationInfo location_info(tablet_id_, ls_id_, cs_type);
|
||||
if (has_extern && OB_FAIL(locator.set_table_info(table_id_, column_id))) { // should be column idx
|
||||
STORAGE_LOG(WARN, "Lob: set table info failed", K(ret), K(table_id_), K(column_id));
|
||||
@ -551,6 +560,8 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator,
|
||||
STORAGE_LOG(WARN, "Lob: set transaction info failed", K(ret), K(tx_info));
|
||||
} else if (extern_flags.has_location_info_ && OB_FAIL(locator.set_location_info(location_info))) {
|
||||
STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(location_info));
|
||||
} else if (extern_flags.has_retry_info_ && OB_FAIL(locator.set_retry_info(retry_info))) {
|
||||
STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(retry_info));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,6 +93,7 @@ private:
|
||||
ObString rowkey_str_; // for default values
|
||||
bool enable_locator_v2_;
|
||||
bool is_inited_;
|
||||
ObQueryFlag scan_flag_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
@ -3554,10 +3554,16 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param,
|
||||
if (OB_SUCC(ret) && lob.is_persist_lob() && !lob.has_inrow_data()) {
|
||||
ObMemLobTxInfo *tx_info = nullptr;
|
||||
ObMemLobLocationInfo *location_info = nullptr;
|
||||
ObMemLobRetryInfo *retry_info = nullptr;
|
||||
ObMemLobExternHeader *extern_header = NULL;
|
||||
if (OB_FAIL(lob.get_tx_info(tx_info))) {
|
||||
LOG_WARN("failed to get tx info", K(ret), K(lob));
|
||||
} else if (OB_FAIL(lob.get_location_info(location_info))) {
|
||||
LOG_WARN("failed to get location info", K(ret), K(lob));
|
||||
} else if (OB_FAIL(lob.get_extern_header(extern_header))) {
|
||||
LOG_WARN("failed to get extern header", K(ret), K(lob));
|
||||
} else if (extern_header->flags_.has_retry_info_ && OB_FAIL(lob.get_retry_info(retry_info))) {
|
||||
LOG_WARN("failed to get retry info", K(ret), K(lob));
|
||||
} else {
|
||||
auto snapshot_tx_seq = transaction::ObTxSEQ::cast_from_int(tx_info->snapshot_seq_);
|
||||
if (OB_ISNULL(param.tx_desc_) ||
|
||||
@ -3569,6 +3575,7 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param,
|
||||
param.snapshot_.valid_ = true;
|
||||
param.snapshot_.source_ = transaction::ObTxReadSnapshot::SRC::LS;
|
||||
param.snapshot_.snapshot_lsid_ = share::ObLSID(location_info->ls_id_);
|
||||
param.read_latest_ = retry_info->read_latest_;
|
||||
} else {
|
||||
// When param for write, param.tx_desc_ should not be null
|
||||
// If tx indfo from lob locator is old, produce new read snapshot directly
|
||||
|
@ -936,7 +936,7 @@ int ObPersistentLobApator::build_common_scan_param(
|
||||
false, // index_back
|
||||
false, // query_stat
|
||||
ObQueryFlag::MysqlMode, // sql_mode
|
||||
false // read_latest
|
||||
param.read_latest_ // read_latest
|
||||
);
|
||||
query_flag.disable_cache();
|
||||
query_flag.scan_order_ = param.scan_backward_ ? ObQueryFlag::Reverse : ObQueryFlag::Forward;
|
||||
@ -961,6 +961,9 @@ int ObPersistentLobApator::build_common_scan_param(
|
||||
scan_param.limit_param_.offset_ = 0;
|
||||
// sessions
|
||||
scan_param.snapshot_ = param.snapshot_;
|
||||
if(param.read_latest_) {
|
||||
scan_param.tx_id_ = param.snapshot_.core_.tx_id_;
|
||||
}
|
||||
scan_param.sql_mode_ = param.sql_mode_;
|
||||
// common set
|
||||
scan_param.allocator_ = param.allocator_;
|
||||
|
@ -44,7 +44,8 @@ struct ObLobStorageParam
|
||||
|
||||
struct ObLobAccessParam {
|
||||
ObLobAccessParam()
|
||||
: tx_desc_(nullptr), snapshot_(), tx_id_(), sql_mode_(SMO_DEFAULT), allocator_(nullptr),
|
||||
: tx_desc_(nullptr), snapshot_(), tx_id_(), read_latest_(0),
|
||||
sql_mode_(SMO_DEFAULT), allocator_(nullptr),
|
||||
dml_base_param_(nullptr), column_ids_(),
|
||||
meta_table_schema_(nullptr), piece_table_schema_(nullptr),
|
||||
main_tablet_param_(nullptr), meta_tablet_param_(nullptr), piece_tablet_param_(nullptr),
|
||||
@ -68,12 +69,13 @@ public:
|
||||
TO_STRING_KV(K_(tenant_id), K_(src_tenant_id), K_(ls_id), K_(tablet_id), KPC_(lob_locator), KPC_(lob_common),
|
||||
KPC_(lob_data), K_(byte_size), K_(handle_size), K_(coll_type), K_(scan_backward), K_(offset), K_(len),
|
||||
K_(parent_seq_no), K_(seq_no_st), K_(used_seq_cnt), K_(total_seq_cnt), K_(checksum),
|
||||
K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(inrow_read_nocopy),
|
||||
K_(inrow_threshold), K_(spec_lob_id));
|
||||
K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(read_latest),
|
||||
K_(inrow_read_nocopy), K_(inrow_threshold), K_(spec_lob_id));
|
||||
public:
|
||||
transaction::ObTxDesc *tx_desc_; // for write/update/delete
|
||||
transaction::ObTxReadSnapshot snapshot_; // for read
|
||||
transaction::ObTransID tx_id_; // used when read-latest
|
||||
transaction::ObTransID tx_id_; // used when read-latest
|
||||
bool read_latest_;
|
||||
ObSQLMode sql_mode_;
|
||||
bool is_total_quantity_log_;
|
||||
ObIAllocator *allocator_;
|
||||
|
Reference in New Issue
Block a user