[CP] support parallel dml modify same row concurrently

Co-authored-by: Handora <qcdsr970209@gmail.com>
This commit is contained in:
chinaxing
2024-08-23 17:24:42 +00:00
committed by ob-robot
parent a39c058fbc
commit 26c0e1f161
18 changed files with 252 additions and 37 deletions

View File

@ -70,9 +70,10 @@ void ObMemtableCtx::free_mvcc_row_callback(ObITransCallback *cb)
}
}
int ObMvccRow::check_double_insert_(const share::SCN ,
ObMvccTransNode &,
ObMvccTransNode *)
int ObMvccRow::mvcc_sanity_check_(const share::SCN ,
const concurrent_control::ObWriteFlag ,
ObMvccTransNode &,
ObMvccTransNode *)
{
return OB_SUCCESS;
}

View File

@ -351,6 +351,91 @@ int ObDmlCgService::generate_update_ctdef(ObLogDelUpd &op,
return ret;
}
int ObDmlCgService::check_is_update_uk(ObLogDelUpd &op,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &update_cids,
ObDASUpdCtDef &das_upd_ctdef)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard *schema_guard = NULL;
const ObTableSchema *table_schema = NULL;
ObSEArray<uint64_t, 8> rowkey_cids;
bool is_update_uk = false;
ObLogPlan *log_plan = op.get_plan();
if (OB_ISNULL(log_plan) ||
OB_ISNULL(schema_guard = log_plan->get_optimizer_context().get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(MTL_ID(), index_dml_info.ref_table_id_, table_schema))) {
LOG_WARN("fail to get unique index schema", K(ret), K(index_dml_info));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (index_dml_info.is_primary_index_) {
// 主表
if (OB_FAIL(table_schema->get_rowkey_column_ids(rowkey_cids))) {
LOG_WARN("fail to get rowkey cids", K(ret));
}
} else if (!table_schema->is_global_unique_index_table()) {
// 非global unique index,不需要检查
} else if (OB_FAIL(table_schema->get_rowkey_column_ids(rowkey_cids))) {
LOG_WARN("fail to get rowkey cids", K(ret));
}
if (OB_SUCC(ret)) {
for (int64_t i = 0; OB_SUCC(ret) && !is_update_uk && i < update_cids.count(); i++) {
if (has_exist_in_array(rowkey_cids, update_cids.at(i))) {
is_update_uk = true;
}
}
}
if (OB_SUCC(ret)) {
das_upd_ctdef.is_update_uk_ = is_update_uk;
}
return ret;
}
int ObDmlCgService::check_is_update_local_unique_index(ObLogDelUpd &op,
uint64_t index_tid,
ObIArray<uint64_t> &update_cids,
ObDASUpdCtDef &das_upd_ctdef)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard *schema_guard = NULL;
const ObTableSchema *unique_index_schema = NULL;
ObSEArray<uint64_t, 8> rowkey_cids;
ObLogPlan *log_plan = op.get_plan();
bool is_update_uk = false;
if (OB_ISNULL(log_plan) ||
OB_ISNULL(schema_guard = log_plan->get_optimizer_context().get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(MTL_ID(), index_tid, unique_index_schema))) {
LOG_WARN("fail to get unique index schema", K(ret), K(index_tid));
} else if (OB_ISNULL(unique_index_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (!unique_index_schema->is_local_unique_index_table()) {
// not need check it
} else if (OB_FAIL(unique_index_schema->get_rowkey_column_ids(rowkey_cids))) {
LOG_WARN("fail to get rowkey column_ids", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && !is_update_uk && i < update_cids.count(); i++) {
if (has_exist_in_array(rowkey_cids, update_cids.at(i))) {
is_update_uk = true;
}
}
}
if (OB_SUCC(ret)) {
das_upd_ctdef.is_update_uk_ = is_update_uk;
}
return ret;
}
int ObDmlCgService::generate_update_ctdef(ObLogDelUpd &op,
const IndexDMLInfo &index_dml_info,
ObUpdCtDef &upd_ctdef)
@ -359,6 +444,7 @@ int ObDmlCgService::generate_update_ctdef(ObLogDelUpd &op,
ObSEArray<ObRawExpr*, 64> old_row;
ObSEArray<ObRawExpr*, 64> new_row;
ObSEArray<ObRawExpr*, 64> full_row;
bool is_update_uk = false;
const ObAssignments &assigns = index_dml_info.assignments_;
bool gen_expand_ctdef = false;
LOG_TRACE("begin to generate update ctdef", K(index_dml_info));
@ -402,6 +488,11 @@ int ObDmlCgService::generate_update_ctdef(ObLogDelUpd &op,
new_row,
full_row))) {
LOG_WARN("generate das update ctdef failed", K(ret));
} else if (OB_FAIL(check_is_update_uk(op,
index_dml_info,
upd_ctdef.dupd_ctdef_.updated_column_ids_,
upd_ctdef.dupd_ctdef_))) {
LOG_WARN("fail to check is update uk", K(ret), K(upd_ctdef.dupd_ctdef_));
} else if (OB_FAIL(generate_related_upd_ctdef(op,
index_dml_info.related_index_ids_,
index_dml_info,
@ -2189,6 +2280,11 @@ int ObDmlCgService::generate_related_upd_ctdef(ObLogDelUpd &op,
new_row,
full_row))) {
LOG_WARN("generate das ins ctdef failed", K(ret));
} else if (OB_FAIL(check_is_update_local_unique_index(op,
related_tid,
related_ctdef->updated_column_ids_,
*related_ctdef))) {
LOG_WARN("fail to check is update uk", K(ret), K(related_tid));
} else if (related_ctdef->updated_column_ids_.empty()) {
//ignore invalid update ctdef
} else if (OB_FAIL(upd_ctdefs.push_back(related_ctdef))) {

View File

@ -51,6 +51,16 @@ public:
const IndexDMLInfo &index_dml_info,
ObUpdCtDef &upd_ctdef);
int check_is_update_local_unique_index(ObLogDelUpd &op,
uint64_t index_tid,
ObIArray<uint64_t> &update_cids,
ObDASUpdCtDef &das_upd_ctdef);
int check_is_update_uk(ObLogDelUpd &op,
const IndexDMLInfo &index_dml_info,
ObIArray<uint64_t> &update_cids,
ObDASUpdCtDef &das_upd_ctdef);
int generate_lock_ctdef(ObLogForUpdate &op,
const IndexDMLInfo &index_dml_info,
ObLockCtDef *&lock_ctdef);

View File

@ -83,7 +83,9 @@ public:
uint64_t is_insert_up_ : 1;
uint64_t is_table_api_ : 1;
uint64_t is_access_mlog_as_master_table_ : 1;
uint64_t reserved_ : 58;
uint64_t is_update_partition_key_ : 1;
uint64_t is_update_uk_ : 1;
uint64_t reserved_ : 56;
};
};
protected:

View File

@ -1223,6 +1223,9 @@ int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
if (base_rtdef.is_for_foreign_key_check_) {
dml_param.write_flag_.set_check_row_locked();
}
if (base_ctdef.is_update_uk_) {
dml_param.write_flag_.set_update_uk();
}
return ret;
}

View File

@ -711,6 +711,7 @@ ob_set_subtarget(ob_storage memtable
ob_set_subtarget(ob_storage memtable_mvcc
memtable/mvcc/ob_multi_version_iterator.cpp
memtable/mvcc/ob_mvcc_acc_ctx.cpp
memtable/mvcc/ob_mvcc_ctx.cpp
memtable/mvcc/ob_mvcc_engine.cpp
memtable/mvcc/ob_mvcc_iterator.cpp

View File

@ -0,0 +1,39 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "storage/memtable/mvcc/ob_mvcc_acc_ctx.h"
#include "storage/memtable/ob_memtable_context.h"
namespace oceanbase
{
namespace memtable
{
int ObMvccAccessCtx::get_write_seq(transaction::ObTxSEQ &seq) const
{
int ret = OB_SUCCESS;
// for update uk or pk, set branch part to 0, in orer to let tx-callback fall into single list
if (tx_scn_.support_branch() && write_flag_.is_update_uk()) {
const int branch = tx_scn_.get_branch();
if (branch == 0) {
seq = tx_scn_;
} else if (OB_UNLIKELY(ObTxDesc::is_alloced_branch_id(branch))) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "external branch not support concurrent update uk / pk", K(ret), KPC(this));
} else {
seq = ObTxSEQ(tx_scn_.get_seq(), 0);
}
} else {
seq = tx_scn_;
}
return ret;
}
} // memtable
} // oceanbase

View File

@ -233,6 +233,7 @@ public:
}
return expire_ts;
}
int get_write_seq(transaction::ObTxSEQ &seq) const;
TO_STRING_KV(K_(type),
K_(abs_lock_timeout_ts),
K_(tx_lock_timeout_us),
@ -272,7 +273,7 @@ public: // NOTE: those field should only be accessed by txn relative routine
transaction::ObTxDesc *tx_desc_; // the txn descriptor
transaction::ObPartTransCtx *tx_ctx_; // the txn context
ObMemtableCtx *mem_ctx_; // memtable-ctx
transaction::ObTxSEQ tx_scn_; // the change's number of this modify
transaction::ObTxSEQ tx_scn_; // the change's number of this modify
concurrent_control::ObWriteFlag write_flag_; // the write flag of the write process
// this was used for runtime metric

View File

@ -44,6 +44,8 @@ struct ObTxNodeArg
int64_t memstore_version_;
// seq_no_ is the sequence no of the executing sql
transaction::ObTxSEQ seq_no_;
// parallel write epoch no
int64_t write_epoch_;
// scn_ is thee log ts of the redo log
share::SCN scn_;
int64_t column_cnt_;
@ -55,6 +57,7 @@ struct ObTxNodeArg
K_(acc_checksum),
K_(memstore_version),
K_(seq_no),
K_(write_epoch),
K_(scn),
K_(column_cnt));
@ -64,6 +67,7 @@ struct ObTxNodeArg
const ObRowData *old_row,
const int64_t memstore_version,
const transaction::ObTxSEQ seq_no,
const int64_t write_epoch,
const int64_t column_cnt)
: tx_id_(tx_id),
data_(data),
@ -72,6 +76,7 @@ struct ObTxNodeArg
acc_checksum_(0),
memstore_version_(memstore_version),
seq_no_(seq_no),
write_epoch_(write_epoch),
scn_(share::SCN::max_scn()),
column_cnt_(column_cnt) {}
@ -92,6 +97,7 @@ struct ObTxNodeArg
acc_checksum_(acc_checksum),
memstore_version_(memstore_version),
seq_no_(seq_no),
write_epoch_(0),
scn_(scn),
column_cnt_(column_cnt) {}
@ -103,6 +109,7 @@ struct ObTxNodeArg
acc_checksum_ = 0;
memstore_version_ = 0;
seq_no_.reset();
write_epoch_ = 0;
scn_ = share::SCN::min_scn();
column_cnt_ = 0;
}

View File

@ -326,8 +326,6 @@ int ObMvccEngine::mvcc_write(storage::ObStoreCtx &ctx,
{
int ret = OB_SUCCESS;
ObMvccTransNode *node = NULL;
ObMemtableCtx *mem_ctx = ctx.mvcc_acc_ctx_.get_mem_ctx();
if (OB_FAIL(build_tx_node_(arg, node))) {
TRANS_LOG(WARN, "build tx node failed", K(ret), K(ctx), K(arg));
} else if (OB_FAIL(value.mvcc_write(ctx,
@ -336,10 +334,10 @@ int ObMvccEngine::mvcc_write(storage::ObStoreCtx &ctx,
res))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret &&
OB_TRANSACTION_SET_VIOLATION != ret) {
TRANS_LOG(WARN, "mvcc write failed", K(ret), KPC(mem_ctx), K(arg));
TRANS_LOG(WARN, "mvcc write failed", K(ret), K(arg));
}
} else {
TRANS_LOG(DEBUG, "mvcc write succeed", K(ret), KPC(mem_ctx), K(arg), K(*node));
TRANS_LOG(DEBUG, "mvcc write succeed", K(ret), K(arg), K(*node));
}
return ret;
@ -350,7 +348,6 @@ int ObMvccEngine::mvcc_replay(const ObTxNodeArg &arg,
{
int ret = OB_SUCCESS;
ObMvccTransNode *node = NULL;
if (OB_FAIL(build_tx_node_(arg, node))) {
TRANS_LOG(WARN, "build tx node failed", K(ret), K(arg));
} else {
@ -374,6 +371,7 @@ int ObMvccEngine::build_tx_node_(const ObTxNodeArg &arg,
node->version_ = arg.memstore_version_;
node->scn_ = arg.scn_;
node->seq_no_ = arg.seq_no_;
node->write_epoch_ = arg.write_epoch_;
node->prev_ = NULL;
node->next_ = NULL;

View File

@ -223,7 +223,8 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const
"snapshot_barrier=%ld "
"snapshot_barrier_flag=%ld "
"mtd=%s "
"seq_no=%s",
"seq_no=%s "
"write_epoch=%ld",
this,
to_cstring(trans_version_),
to_cstring(scn_),
@ -239,7 +240,8 @@ int64_t ObMvccTransNode::to_string(char *buf, const int64_t buf_len) const
& (~SNAPSHOT_VERSION_BARRIER_BIT),
snapshot_version_barrier_ >> 62,
to_cstring(*mtd),
to_cstring(seq_no_));
to_cstring(seq_no_),
write_epoch_);
return pos;
}
@ -514,7 +516,10 @@ int ObMvccRow::insert_trans_node(ObIMvccCtx &ctx,
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "meet unexpected index_node", KR(ret), K(*prev), K(node), K(*index_node), K(*this));
abort_unless(0);
} else if (prev->tx_id_ == node.tx_id_ && OB_UNLIKELY(prev->seq_no_ > node.seq_no_)) {
} else if (prev->tx_id_ == node.tx_id_
&& OB_UNLIKELY(prev->seq_no_ > node.seq_no_)
// exclude the concurrently update uk case, which always in branch 0
&& !(prev->seq_no_.get_branch() == 0 && node.seq_no_.get_branch() == 0)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "prev node seq_no > this node", KR(ret), KPC(prev), K(node), KPC(this));
usleep(1000);
@ -825,9 +830,9 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
iter = iter->prev_;
need_retry = true;
} else if (data_tx_id == writer_tx_id) {
bool is_lock_node = false;
// Case 4: the newest node is not decided and locked by itself, so we
// can insert into it
bool is_lock_node = false;
if (OB_FAIL(writer_node.is_lock_node(is_lock_node))) {
TRANS_LOG(ERROR, "get is lock node failed", K(ret), K(writer_node));
} else if (is_lock_node) {
@ -881,9 +886,10 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
list_head_->get_seq_no()))) {
TRANS_LOG(WARN, "check sequence set violation failed", K(ret), KPC(this));
} else if (nullptr != list_head_ && FALSE_IT(res.is_checked_ = true)) {
} else if (OB_SUCC(check_double_insert_(snapshot_version,
writer_node,
list_head_))) {
} else if (OB_SUCC(mvcc_sanity_check_(snapshot_version,
ctx.mvcc_acc_ctx_.write_flag_,
writer_node,
list_head_))) {
ATOMIC_STORE(&(writer_node.prev_), list_head_);
ATOMIC_STORE(&(writer_node.next_), NULL);
if (NULL != list_head_) {
@ -914,19 +920,43 @@ int ObMvccRow::mvcc_write_(ObStoreCtx &ctx,
}
__attribute__((noinline))
int ObMvccRow::check_double_insert_(const SCN snapshot_version,
ObMvccTransNode &node,
ObMvccTransNode *prev)
int ObMvccRow::mvcc_sanity_check_(const SCN snapshot_version,
const concurrent_control::ObWriteFlag write_flag,
ObMvccTransNode &node,
ObMvccTransNode *prev)
{
int ret = OB_SUCCESS;
const bool compliant_with_sql_semantic = !write_flag.is_table_api();
if (NULL != prev) {
if (blocksstable::ObDmlFlag::DF_INSERT == node.get_dml_flag()
&& blocksstable::ObDmlFlag::DF_DELETE != prev->get_dml_flag()
&& prev->is_committed()
&& snapshot_version >= prev->trans_version_) {
// Case 1: Check double insert case
ret = OB_ERR_PRIMARY_KEY_DUPLICATE;
TRANS_LOG(WARN, "find double insert node", K(ret), K(node), KPC(prev), K(snapshot_version), K(*this));
} else if (prev->get_tx_id() == node.get_tx_id()
&& prev->is_incomplete()
&& compliant_with_sql_semantic) {
// TODO(handora.qc): remove after kaizhan.dkz finish the concureent
// insert/delete feature
//
// Case 2: The current implementation allows the same rowkey to perform
// insert and delete within the same statement concurrently. So to prevent
// disorder between insert and callback registeration, we return an error
// in this scenario, hoping that the sql layer will retry later.
// Otherwise, it may lead to an out-of-order actions from logs between
// leader and follower.
ret = OB_SEQ_NO_REORDER_UNDER_PDML;
TRANS_LOG(INFO, "mvcc_write meet current write by self", K(ret), KPC(prev), K(node));
} else if (prev->get_tx_id() == node.get_tx_id()
&& prev->get_write_epoch() == node.get_write_epoch()
&& prev->get_seq_no().get_branch() != node.get_seq_no().get_branch()) {
// Case 3: Check concurrent modify to the same row
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "concurrent modify to the same row", K(ret), KPC(prev), K(node));
}
}

View File

@ -69,6 +69,7 @@ public:
trans_version_(share::SCN::min_scn()),
scn_(share::SCN::max_scn()),
seq_no_(),
write_epoch_(0),
tx_end_scn_(share::SCN::max_scn()),
prev_(NULL),
next_(NULL),
@ -85,6 +86,7 @@ public:
share::SCN trans_version_;
share::SCN scn_;
transaction::ObTxSEQ seq_no_;
int64_t write_epoch_;
share::SCN tx_end_scn_;
ObMvccTransNode *prev_;
ObMvccTransNode *next_;
@ -201,7 +203,7 @@ public:
share::SCN get_tx_end_scn() const { return tx_end_scn_.atomic_load(); }
share::SCN get_tx_version() const { return trans_version_.atomic_load(); }
share::SCN get_scn() const { return scn_.atomic_load(); }
int64_t get_write_epoch() const { return write_epoch_; }
private:
// the row flag of the mvcc tx node
static const uint8_t F_INIT;
@ -420,10 +422,11 @@ struct ObMvccRow
ObMvccWriteResult &res);
// ===================== ObMvccRow Protection Code =====================
// check double insert
int check_double_insert_(const share::SCN snapshot_version,
ObMvccTransNode &node,
ObMvccTransNode *prev);
// sanity check during mvcc_write
int mvcc_sanity_check_(const share::SCN snapshot_version,
const concurrent_control::ObWriteFlag write_flag,
ObMvccTransNode &node,
ObMvccTransNode *prev);
};
}

View File

@ -253,6 +253,7 @@ public:
int acquire_callback_list(const bool new_epoch);
void revert_callback_list();
int get_tx_seq_replay_idx(const transaction::ObTxSEQ seq) const;
int64_t get_write_epoch() const { return write_epoch_; }
common::SpinRWLock& get_rwlock() { return rwlock_; }
private:
void wakeup_waiting_txns_();

View File

@ -33,7 +33,8 @@ struct ObWriteFlag
#define OBWF_BIT_CHECK_ROW_LOCKED 1
#define OBWF_BIT_LOB_AUX 1
#define OBWF_BIT_SKIP_FLUSH_REDO 1
#define OBWF_BIT_RESERVED 55
#define OBWF_BIT_UPDATE_UK 1
#define OBWF_BIT_RESERVED 54
static const uint64_t OBWF_MASK_TABLE_API = (0x1UL << OBWF_BIT_TABLE_API) - 1;
static const uint64_t OBWF_MASK_TABLE_LOCK = (0x1UL << OBWF_BIT_TABLE_LOCK) - 1;
@ -57,6 +58,7 @@ struct ObWriteFlag
uint64_t is_check_row_locked_ : OBWF_BIT_CHECK_ROW_LOCKED; // 0: false(default), 1: true
uint64_t is_lob_aux_ : OBWF_BIT_LOB_AUX; // 0: false(default), 1: true
uint64_t is_skip_flush_redo_ : OBWF_BIT_SKIP_FLUSH_REDO; // 0: false(default), 1: true
uint64_t is_update_uk_ : OBWF_BIT_UPDATE_UK; // 0: false(default), 1: true
uint64_t reserved_ : OBWF_BIT_RESERVED;
};
};
@ -82,6 +84,8 @@ struct ObWriteFlag
inline bool is_skip_flush_redo() const { return is_skip_flush_redo_; }
inline void set_skip_flush_redo() { is_skip_flush_redo_ = true; }
inline void unset_skip_flush_redo() { is_skip_flush_redo_ = false; }
inline void set_update_uk() { is_update_uk_ = true; }
inline bool is_update_uk() const { return is_update_uk_; }
TO_STRING_KV("is_table_api", is_table_api_,
"is_table_lock", is_table_lock_,
@ -91,7 +95,8 @@ struct ObWriteFlag
"is_write_only_index", is_write_only_index_,
"is_check_row_locked", is_check_row_locked_,
"is_lob_aux", is_lob_aux_,
"is_skip_flush_redo", is_skip_flush_redo_);
"is_skip_flush_redo", is_skip_flush_redo_,
"is_update_uk", is_update_uk_);
OB_UNIS_VERSION(1);
};

View File

@ -2635,9 +2635,14 @@ int ObMemtable::set_(
}
if (OB_SUCC(ret)) {
bool is_new_locked = false;
ObTxSEQ write_seq;
int64_t write_epoch = 0;
row_writer.reset();
if (OB_FAIL(ret)) {
//do nothing
} else if (FALSE_IT(write_epoch = mem_ctx->get_write_epoch())) {
} else if (OB_FAIL(ctx.mvcc_acc_ctx_.get_write_seq(write_seq))) {
TRANS_LOG(WARN, "get write seq failed", K(ret));
} else if (OB_FAIL(row_writer.write(param.get_schema_rowkey_count(), new_row, update_idx, buf, len))) {
TRANS_LOG(WARN, "Failed to write new row", K(ret), K(new_row));
} else if (OB_UNLIKELY(new_row.flag_.is_not_exist())) {
@ -2650,7 +2655,8 @@ int ObMemtable::set_(
&mtd, /*memtable_data*/
NULL == old_row ? NULL : &old_row_data,
init_timestamp_, /*memstore_version*/
ctx.mvcc_acc_ctx_.tx_scn_, /*seq_no*/
write_seq, /*seq_no*/
write_epoch, /*write_epoch*/
new_row.row_val_.count_ /*column_cnt*/);
if (OB_FAIL(mvcc_write_(param,
context,
@ -2722,8 +2728,17 @@ int ObMemtable::lock_(
blocksstable::ObRowWriter row_writer;
char *buf = NULL;
int64_t len = 0;
if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
ObTxSEQ lock_seq;
ObMvccAccessCtx &acc_ctx = context.store_ctx_->mvcc_acc_ctx_;
ObMemtableCtx *mem_ctx = acc_ctx.get_mem_ctx();
int64_t write_epoch = 0;
if (OB_ISNULL(mem_ctx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "mem_ctx is null", K(ret), KPC(context.store_ctx_));
} else if (FALSE_IT(write_epoch = mem_ctx->get_write_epoch())) {
} else if (OB_FAIL(acc_ctx.get_write_seq(lock_seq))) {
TRANS_LOG(WARN, "get write seq failed", K(ret));
} else if (OB_FAIL(row_writer.write_rowkey(rowkey, buf, len))) {
TRANS_LOG(WARN, "Failed to writer rowkey", K(ret), K(rowkey));
} else {
// for elr optimization
@ -2733,8 +2748,9 @@ int ObMemtable::lock_(
ObTxNodeArg arg(acc_ctx.tx_id_, /*trans id*/
&mtd, /*memtable_data*/
NULL, /*old_data*/
init_timestamp_, /*memstore_version*/
acc_ctx.tx_scn_, /*seq_no*/
init_timestamp_, /*memstore_version*/
lock_seq, /*seq_no*/
write_epoch, /*write_epoch*/
rowkey.get_obj_cnt()); /*column_cnt*/
if (OB_FAIL(mvcc_write_(param,
context,

View File

@ -475,7 +475,7 @@ public: // callback
void set_for_replay(const bool for_replay) { trans_mgr_.set_for_replay(for_replay); }
void inc_pending_log_size(const int64_t size) { trans_mgr_.inc_pending_log_size(size); }
void inc_flushed_log_size(const int64_t size) { trans_mgr_.inc_flushed_log_size(size); }
int64_t get_write_epoch() const { return trans_mgr_.get_write_epoch(); }
public:
// tx_status
enum ObTxStatus {

View File

@ -693,7 +693,8 @@ public:
bool support_branch() const { return seq_base_ > 0; }
// used by SQL alloc branch_id refer the min branch_id allowed
// because branch_id bellow this is reserved for internal use
int branch_id_offset() const { return MAX_CALLBACK_LIST_COUNT; }
static int branch_id_offset() { return MAX_CALLBACK_LIST_COUNT; }
static bool is_alloced_branch_id(int branch_id) { return branch_id >= branch_id_offset(); }
int alloc_branch_id(const int64_t count, int16_t &branch_id);
int fetch_conflict_txs(ObIArray<ObTransIDAndAddr> &array);
void reset_conflict_txs()

View File

@ -113,9 +113,10 @@ int ObReadInfoStruct::init_compat_version()
namespace memtable
{
int ObMvccRow::check_double_insert_(const share::SCN ,
ObMvccTransNode &,
ObMvccTransNode *)
int ObMvccRow::mvcc_sanity_check_(const share::SCN ,
const concurrent_control::ObWriteFlag ,
ObMvccTransNode &,
ObMvccTransNode *)
{
return OB_SUCCESS;
}