BUGFIX: lockop use local time
This commit is contained in:
@ -1044,19 +1044,23 @@ int ObMemtableCtx::recover_from_table_lock_durable_info(const ObTableLockInfo &t
|
||||
const int64_t op_cnt = table_lock_info.table_lock_ops_.count();
|
||||
ObLockMemtable* lock_memtable = nullptr;
|
||||
ObMemCtxLockOpLinkNode *lock_op_node = nullptr;
|
||||
const int64_t curr_timestamp = ObTimeUtility::current_time();
|
||||
for (int64_t i = 0; i < op_cnt && OB_SUCC(ret); ++i) {
|
||||
tablelock::ObTableLockOp table_lock_op = table_lock_info.table_lock_ops_.at(i);
|
||||
if (!table_lock_op.is_valid()) {
|
||||
tablelock::ObTableLockOp lock_op = table_lock_info.table_lock_ops_.at(i);
|
||||
if (!lock_op.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "the table_lock_op is not valid", K(table_lock_op));
|
||||
TRANS_LOG(ERROR, "the lock_op is not valid", K(lock_op));
|
||||
// NOTE: we only need recover the lock op at lock list and lock map.
|
||||
// the buffer at multi source data is recovered by multi source data.
|
||||
} else if (OB_FAIL(lock_mem_ctx_.add_lock_record(table_lock_op, lock_op_node, true))) {
|
||||
TRANS_LOG(ERROR, "add_lock_record failed", K(ret), K(table_lock_op));
|
||||
// the tx ctx table may be copied from other ls replica we need fix the lockop's create timestamp.
|
||||
} else if (FALSE_IT(lock_op.create_timestamp_ = OB_MIN(curr_timestamp,
|
||||
lock_op.create_timestamp_))) {
|
||||
} else if (OB_FAIL(lock_mem_ctx_.add_lock_record(lock_op, lock_op_node, true))) {
|
||||
TRANS_LOG(ERROR, "add_lock_record failed", K(ret), K(lock_op));
|
||||
} else if (OB_FAIL(lock_mem_ctx_.get_lock_memtable(lock_memtable))) {
|
||||
TRANS_LOG(ERROR, "get_lock_memtable failed", K(ret));
|
||||
} else if (OB_NOT_NULL(lock_memtable)
|
||||
&& OB_FAIL(lock_memtable->recover_obj_lock(table_lock_op))) {
|
||||
&& OB_FAIL(lock_memtable->recover_obj_lock(lock_op))) {
|
||||
TRANS_LOG(ERROR, "recover_obj_lock failed", K(ret), K(*lock_memtable));
|
||||
} else {
|
||||
lock_mem_ctx_.set_log_synced(lock_op_node, table_lock_info.max_durable_scn_);
|
||||
|
||||
@ -849,6 +849,7 @@ int ObLockMemtable::replay_row(
|
||||
int64_t create_timestamp = 0;
|
||||
int64_t create_schema_vesion = -1;
|
||||
ObMemtableCtx *mem_ctx = nullptr;
|
||||
const int64_t curr_timestamp = ObTimeUtility::current_time();
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -875,7 +876,7 @@ int ObLockMemtable::replay_row(
|
||||
lock_op_type,
|
||||
LOCK_OP_DOING,
|
||||
seq_no,
|
||||
create_timestamp,
|
||||
OB_MIN(create_timestamp, curr_timestamp),
|
||||
create_schema_vesion);
|
||||
if (OB_UNLIKELY(!lock_op.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
||||
@ -138,6 +138,7 @@ int ObLockTable::recover_(const blocksstable::ObDatumRow &row)
|
||||
int64_t idx = row.storage_datums_[TABLE_LOCK_KEY_COLUMN].get_int();
|
||||
ObString obj_str = row.storage_datums_[TABLE_LOCK_KEY_COLUMN + 1].get_string();
|
||||
ObTableLockOp store_info;
|
||||
const int64_t curr_timestamp = ObTimeUtility::current_time();
|
||||
|
||||
ObTableHandleV2 handle;
|
||||
ObLockMemtable *memtable = nullptr;
|
||||
@ -147,6 +148,10 @@ int ObLockTable::recover_(const blocksstable::ObDatumRow &row)
|
||||
LOG_WARN("ObLockTable not inited", K(ret));
|
||||
} else if (OB_FAIL(store_info.deserialize(obj_str.ptr(), obj_str.length(), pos))) {
|
||||
LOG_WARN("failed to deserialize ObTableLockOp", K(ret));
|
||||
// we may recover from a sstable that copy from other ls replica,
|
||||
// the create timestamp need to be fixed.
|
||||
} else if (FALSE_IT(store_info.create_timestamp_ = OB_MIN(store_info.create_timestamp_,
|
||||
curr_timestamp))) {
|
||||
} else if (OB_FAIL(get_lock_memtable(handle))) {
|
||||
LOG_WARN("get lock memtable failed", K(ret));
|
||||
} else if (OB_FAIL(handle.get_lock_memtable(memtable))) {
|
||||
|
||||
@ -229,16 +229,19 @@ int ObMulSourceTxDataNotifier::notify_table_lock(
|
||||
const bool is_replay_multi_source = true;
|
||||
const bool is_committed = true;
|
||||
int64_t pos = 0;
|
||||
const int64_t curr_timestamp = ObTimeUtility::current_time();
|
||||
if (OB_ISNULL(buf) || OB_UNLIKELY(len <= 0) || OB_ISNULL(mt_ctx)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", K(ret), K(buf), K(len), K(mt_ctx));
|
||||
} else if (OB_FAIL(lock_op.deserialize(buf, len, pos))) {
|
||||
TRANS_LOG(WARN, "deserialize lock op failed", K(ret), K(len), KP(buf));
|
||||
} else if (NotifyType::TX_END != type) {
|
||||
// TABLELOCK only deal with tx_end type, but not redo/prepare/commit/abort.
|
||||
} else if (!arg.for_replay_) {
|
||||
// TABLELOCK only need deal with replay process, but not apply.
|
||||
// the replay process will produce a lock op and will be dealt at trans end.
|
||||
} else if (OB_FAIL(lock_op.deserialize(buf, len, pos))) {
|
||||
TRANS_LOG(WARN, "deserialize lock op failed", K(ret), K(len), KP(buf));
|
||||
} else if (FALSE_IT(lock_op.create_timestamp_ = OB_MIN(curr_timestamp,
|
||||
lock_op.create_timestamp_))) {
|
||||
} else if (OB_FAIL(mt_ctx->replay_lock(lock_op,
|
||||
arg.scn_))) {
|
||||
TRANS_LOG(WARN, "replay lock failed", K(ret));
|
||||
|
||||
Reference in New Issue
Block a user