BUGFIX: wake up lock wait mgr if OUT_TRANS_UNLOCK finished
This commit is contained in:
@ -424,6 +424,7 @@ class ObString;
|
||||
ACT(AFTER_BACKUP_FETCH_MACRO_BLOCK_FAILED,)\
|
||||
ACT(BEFORE_TABLE_REDEFINITION_TASK_EFFECT,)\
|
||||
ACT(ALTER_LS_CHOOSE_SRC,)\
|
||||
ACT(BEFORE_LOCK_SERVICE_UNLOCK,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
||||
@ -171,8 +171,7 @@ int ObLockMemtable::lock_(
|
||||
if (ret == OB_OBJ_LOCK_EXIST) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
if (ENABLE_USE_LOCK_WAIT_MGR &&
|
||||
OB_TRY_LOCK_ROW_CONFLICT == ret &&
|
||||
if (OB_TRY_LOCK_ROW_CONFLICT == ret &&
|
||||
lock_op.is_dml_lock_op() && // only in trans dml lock will wait at lock wait mgr.
|
||||
conflict_tx_set.count() != 0) {
|
||||
// TODO: yanyuan.cxf only wait at the first conflict trans now, but we need
|
||||
@ -351,19 +350,6 @@ int ObLockMemtable::post_obj_lock_conflict_(ObMvccAccessCtx &acc_ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLockMemtable::wakeup_waiters_(const ObTableLockOp &lock_op)
|
||||
{
|
||||
// dml in trans lock does not need do this.
|
||||
if (OB_LIKELY(!lock_op.need_wakeup_waiter())) {
|
||||
// do nothing
|
||||
} else if (OB_ISNULL(MTL(ObLockWaitMgr*))) {
|
||||
LOG_WARN("MTL(ObLockWaitMgr*) is null");
|
||||
} else {
|
||||
MTL(ObLockWaitMgr*)->wakeup(lock_op.lock_id_);
|
||||
LOG_DEBUG("ObLockMemtable::wakeup_waiters_ ", K(lock_op));
|
||||
}
|
||||
}
|
||||
|
||||
int ObLockMemtable::check_lock_conflict(
|
||||
const ObMemtableCtx *mem_ctx,
|
||||
const ObTableLockOp &lock_op,
|
||||
@ -489,9 +475,6 @@ void ObLockMemtable::remove_lock_record(const ObTableLockOp &lock_op)
|
||||
LOG_WARN("invalid argument", K(ret), K(lock_op));
|
||||
} else {
|
||||
obj_lock_map_.remove_lock_record(lock_op);
|
||||
if (ENABLE_USE_LOCK_WAIT_MGR) {
|
||||
wakeup_waiters_(lock_op);
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("ObLockMemtable::remove_lock_record", K(lock_op));
|
||||
}
|
||||
|
||||
@ -207,7 +207,6 @@ private:
|
||||
const ObLockID &lock_id,
|
||||
const ObTransID &conflict_tx_id,
|
||||
ObFunction<int(bool &need_wait)> &recheck_f);
|
||||
void wakeup_waiters_(const ObTableLockOp &lock_op);
|
||||
private:
|
||||
typedef common::SpinRWLock RWLock;
|
||||
typedef common::SpinRLockGuard RLockGuard;
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
#include "share/ob_define.h"
|
||||
#include "share/ob_errno.h"
|
||||
#include "lib/oblog/ob_log_module.h"
|
||||
#include "storage/memtable/ob_lock_wait_mgr.h"
|
||||
#include "storage/tx/ob_trans_deadlock_adapter.h"
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
@ -401,6 +402,11 @@ int ObOBJLock::update_lock_status(
|
||||
LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) &&
|
||||
lock_op.op_type_ == OUT_TRANS_UNLOCK &&
|
||||
status == LOCK_OP_COMPLETE) {
|
||||
wakeup_waiters_(lock_op);
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("update lock status", K(ret), K(lock_op), K(commit_version), K(status));
|
||||
return ret;
|
||||
@ -470,8 +476,7 @@ int ObOBJLock::lock(
|
||||
// something else.
|
||||
need_retry = false;
|
||||
ret = OB_TRANS_KILLED;
|
||||
} else if (ENABLE_USE_LOCK_WAIT_MGR &&
|
||||
lock_op.is_dml_lock_op() /* only dml lock will wait at lock wait mgr */) {
|
||||
} else if (lock_op.is_dml_lock_op() /* only dml lock will wait at lock wait mgr */) {
|
||||
// wait at lock wait mgr but not retry at here.
|
||||
need_retry = false;
|
||||
} else {
|
||||
@ -590,10 +595,24 @@ void ObOBJLock::remove_lock_op(
|
||||
drop_op_list_if_empty_(lock_op.lock_mode_,
|
||||
op_list,
|
||||
allocator);
|
||||
wakeup_waiters_(lock_op);
|
||||
}
|
||||
LOG_DEBUG("ObOBJLock::remove_lock_op finish.");
|
||||
}
|
||||
|
||||
void ObOBJLock::wakeup_waiters_(const ObTableLockOp &lock_op)
|
||||
{
|
||||
// dml in trans lock does not need do this.
|
||||
if (OB_LIKELY(!lock_op.need_wakeup_waiter())) {
|
||||
// do nothing
|
||||
} else if (OB_ISNULL(MTL(ObLockWaitMgr*))) {
|
||||
LOG_WARN("MTL(ObLockWaitMgr*) is null");
|
||||
} else {
|
||||
MTL(ObLockWaitMgr*)->wakeup(lock_op.lock_id_);
|
||||
LOG_DEBUG("ObOBJLock::wakeup_waiters_ ", K(lock_op));
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ObOBJLock::get_min_ddl_lock_committed_log_ts(const int64_t flushed_log_ts) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -155,6 +155,7 @@ private:
|
||||
const int64_t commit_log_ts,
|
||||
const ObTableLockOpStatus status,
|
||||
ObTableLockOpList *op_list);
|
||||
void wakeup_waiters_(const ObTableLockOp &lock_op);
|
||||
int recover_(
|
||||
const ObTableLockOp &lock_op,
|
||||
ObMalloc &allocator);
|
||||
|
||||
@ -427,8 +427,6 @@ typedef common::ObSimpleIterator<ObLockID, ObSimpleIteratorModIds::OB_OBJ_LOCK_M
|
||||
// Is used to store and traverse all lock op
|
||||
typedef common::ObSimpleIterator<ObTableLockOp, ObSimpleIteratorModIds::OB_OBJ_LOCK, 16> ObLockOpIterator;
|
||||
|
||||
// whether use lock wait mgr or not.
|
||||
static const bool ENABLE_USE_LOCK_WAIT_MGR = true;
|
||||
// the threshold of timeout interval which will enable the deadlock avoid.
|
||||
static const int64_t MIN_DEADLOCK_AVOID_TIMEOUT_US = 60 * 1000 * 1000; // 1 min
|
||||
bool is_deadlock_avoid_enabled(const int64_t expire_time);
|
||||
|
||||
@ -290,6 +290,7 @@ int ObTableLockService::unlock_table(const uint64_t table_id,
|
||||
LOG_INFO("ObTableLockService::unlock_table",
|
||||
K(table_id), K(lock_mode), K(lock_owner), K(timeout_us));
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(BEFORE_LOCK_SERVICE_UNLOCK);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("lock service is not inited", K(ret), K(table_id), K(lock_mode),
|
||||
@ -353,6 +354,7 @@ int ObTableLockService::unlock_tablet(const uint64_t table_id,
|
||||
const int64_t timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(BEFORE_LOCK_SERVICE_UNLOCK);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("lock service is not inited", K(ret), K(table_id), K(tablet_id),
|
||||
|
||||
Reference in New Issue
Block a user