fix lock contention of get_trans
This commit is contained in:
@ -189,7 +189,7 @@ int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(status));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) {
|
||||
ret = error_code_;
|
||||
LOG_WARN("coordinator has error", KR(ret));
|
||||
@ -220,7 +220,7 @@ int ObTableLoadCoordinatorCtx::set_status_error(int error_code)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(error_code));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) {
|
||||
ret = OB_TRANS_KILLED;
|
||||
} else if (status_ != ObTableLoadStatusType::ERROR) {
|
||||
@ -240,7 +240,7 @@ int ObTableLoadCoordinatorCtx::set_status_abort()
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) {
|
||||
status_ = ObTableLoadStatusType::ABORT;
|
||||
table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_);
|
||||
@ -272,7 +272,7 @@ int ObTableLoadCoordinatorCtx::check_status(ObTableLoadStatusType status) const
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
ret = check_status_unlock(status);
|
||||
}
|
||||
return ret;
|
||||
@ -342,7 +342,7 @@ int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_i
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) {
|
||||
LOG_WARN("fail to check status", KR(ret), K_(status));
|
||||
} else {
|
||||
@ -388,7 +388,7 @@ int ObTableLoadCoordinatorCtx::commit_trans(ObTableLoadCoordinatorTrans *trans)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(trans));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_;
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) {
|
||||
@ -424,7 +424,7 @@ int ObTableLoadCoordinatorCtx::abort_trans(ObTableLoadCoordinatorTrans *trans)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(trans));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_;
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) {
|
||||
@ -462,7 +462,7 @@ void ObTableLoadCoordinatorCtx::put_trans(ObTableLoadCoordinatorTrans *trans)
|
||||
ObTableLoadTransStatusType trans_status = trans_ctx->get_trans_status();
|
||||
OB_ASSERT(ObTableLoadTransStatusType::COMMIT == trans_status ||
|
||||
ObTableLoadTransStatusType::ABORT == trans_status);
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_map_.erase_refactored(trans->get_trans_id()))) {
|
||||
LOG_WARN("fail to erase_refactored", KR(ret));
|
||||
} else {
|
||||
@ -485,7 +485,7 @@ int ObTableLoadCoordinatorCtx::get_trans(const ObTableLoadTransId &trans_id,
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_map_.get_refactored(trans_id, trans))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get_refactored", KR(ret), K(trans_id));
|
||||
@ -507,7 +507,7 @@ int ObTableLoadCoordinatorCtx::get_trans_ctx(const ObTableLoadTransId &trans_id,
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_ctx_map_.get_refactored(trans_id, trans_ctx))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get trans ctx", KR(ret), K(trans_id));
|
||||
@ -527,7 +527,7 @@ int ObTableLoadCoordinatorCtx::get_segment_trans_ctx(const ObTableLoadSegmentID
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) {
|
||||
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
|
||||
@ -556,7 +556,7 @@ int ObTableLoadCoordinatorCtx::get_active_trans_ids(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
for (TransMap::const_iterator trans_iter = trans_map_.begin();
|
||||
OB_SUCC(ret) && trans_iter != trans_map_.end(); ++trans_iter) {
|
||||
if (OB_FAIL(trans_id_array.push_back(trans_iter->first))) {
|
||||
@ -576,7 +576,7 @@ int ObTableLoadCoordinatorCtx::get_committed_trans_ids(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_id_array.create(commited_trans_ctx_array_.count(), allocator))) {
|
||||
LOG_WARN("fail to create trans id array", KR(ret));
|
||||
} else {
|
||||
@ -596,7 +596,7 @@ int ObTableLoadCoordinatorCtx::check_exist_trans(bool &is_exist) const
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
is_exist = !trans_map_.empty();
|
||||
}
|
||||
return ret;
|
||||
@ -609,7 +609,7 @@ int ObTableLoadCoordinatorCtx::check_exist_committed_trans(bool &is_exist) const
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
is_exist = !commited_trans_ctx_array_.empty();
|
||||
}
|
||||
return ret;
|
||||
|
@ -36,12 +36,12 @@ public:
|
||||
public:
|
||||
OB_INLINE table::ObTableLoadStatusType get_status() const
|
||||
{
|
||||
lib::ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
return status_;
|
||||
}
|
||||
OB_INLINE int get_error_code() const
|
||||
{
|
||||
lib::ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
return error_code_;
|
||||
}
|
||||
OB_INLINE int set_status_inited()
|
||||
@ -133,7 +133,7 @@ private:
|
||||
ObTableLoadObjectAllocator<ObTableLoadCoordinatorTrans> trans_allocator_; // 多线程安全
|
||||
uint64_t last_trans_gid_ CACHE_ALIGNED;
|
||||
uint64_t next_session_id_ CACHE_ALIGNED;
|
||||
mutable lib::ObMutex mutex_;
|
||||
mutable obsys::ObRWLock rwlock_;
|
||||
table::ObTableLoadStatusType status_;
|
||||
int error_code_;
|
||||
TransMap trans_map_;
|
||||
|
@ -300,7 +300,7 @@ int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(status));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) {
|
||||
ret = error_code_;
|
||||
LOG_WARN("store has error", KR(ret));
|
||||
@ -331,7 +331,7 @@ int ObTableLoadStoreCtx::set_status_error(int error_code)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(error_code));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) {
|
||||
ret = OB_TRANS_KILLED;
|
||||
} else if (status_ != ObTableLoadStatusType::ERROR) {
|
||||
@ -351,7 +351,7 @@ int ObTableLoadStoreCtx::set_status_abort()
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) {
|
||||
status_ = ObTableLoadStatusType::ABORT;
|
||||
table_load_status_to_string(status_, ctx_->job_stat_->store.status_);
|
||||
@ -383,7 +383,7 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
ret = check_status_unlock(status);
|
||||
}
|
||||
return ret;
|
||||
@ -630,7 +630,7 @@ int ObTableLoadStoreCtx::start_trans(const ObTableLoadTransId &trans_id,
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) {
|
||||
LOG_WARN("fail to check status", KR(ret), K_(status));
|
||||
} else {
|
||||
@ -676,7 +676,7 @@ int ObTableLoadStoreCtx::commit_trans(ObTableLoadStoreTrans *trans)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(trans));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_;
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
ObTableLoadTransStore *trans_store = nullptr;
|
||||
@ -721,7 +721,7 @@ int ObTableLoadStoreCtx::abort_trans(ObTableLoadStoreTrans *trans)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(trans));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_;
|
||||
SegmentCtx *segment_ctx = nullptr;
|
||||
if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) {
|
||||
@ -759,7 +759,7 @@ void ObTableLoadStoreCtx::put_trans(ObTableLoadStoreTrans *trans)
|
||||
ObTableLoadTransStatusType trans_status = trans_ctx->get_trans_status();
|
||||
OB_ASSERT(ObTableLoadTransStatusType::COMMIT == trans_status ||
|
||||
ObTableLoadTransStatusType::ABORT == trans_status);
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_map_.erase_refactored(trans->get_trans_id()))) {
|
||||
LOG_WARN("fail to erase_refactored", KR(ret));
|
||||
} else {
|
||||
@ -782,7 +782,7 @@ int ObTableLoadStoreCtx::get_trans(const ObTableLoadTransId &trans_id,
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_map_.get_refactored(trans_id, trans))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get_refactored", KR(ret), K(trans_id));
|
||||
@ -804,7 +804,7 @@ int ObTableLoadStoreCtx::get_trans_ctx(const ObTableLoadTransId &trans_id,
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_ctx_map_.get_refactored(trans_id, trans_ctx))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get trans ctx", KR(ret), K(trans_id));
|
||||
@ -825,7 +825,7 @@ int ObTableLoadStoreCtx::get_active_trans_ids(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
for (TransMap::const_iterator trans_iter = trans_map_.begin();
|
||||
OB_SUCC(ret) && trans_iter != trans_map_.end(); ++trans_iter) {
|
||||
if (OB_FAIL(trans_id_array.push_back(trans_iter->first))) {
|
||||
@ -844,7 +844,7 @@ int ObTableLoadStoreCtx::get_committed_trans_ids(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_id_array.create(committed_trans_store_array_.count(), allocator))) {
|
||||
LOG_WARN("fail to create trans id array", KR(ret));
|
||||
} else {
|
||||
@ -865,7 +865,7 @@ int ObTableLoadStoreCtx::get_committed_trans_stores(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(trans_store_array.assign(committed_trans_store_array_))) {
|
||||
LOG_WARN("fail to assign trans store array", KR(ret));
|
||||
}
|
||||
@ -880,7 +880,7 @@ int ObTableLoadStoreCtx::check_exist_trans(bool &exist) const
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadStoreCtx not init", KR(ret));
|
||||
} else {
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
exist = !trans_map_.empty();
|
||||
}
|
||||
return ret;
|
||||
@ -888,7 +888,7 @@ int ObTableLoadStoreCtx::check_exist_trans(bool &exist) const
|
||||
|
||||
void ObTableLoadStoreCtx::clear_committed_trans_stores()
|
||||
{
|
||||
ObMutexGuard guard(mutex_);
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
for (int64_t i = 0; i < committed_trans_store_array_.count(); ++i) {
|
||||
ObTableLoadTransStore *trans_store = committed_trans_store_array_.at(i);
|
||||
ObTableLoadTransCtx *trans_ctx = trans_store->trans_ctx_;
|
||||
|
@ -49,12 +49,12 @@ public:
|
||||
public:
|
||||
OB_INLINE table::ObTableLoadStatusType get_status() const
|
||||
{
|
||||
lib::ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
return status_;
|
||||
}
|
||||
OB_INLINE int get_error_code() const
|
||||
{
|
||||
lib::ObMutexGuard guard(mutex_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
return error_code_;
|
||||
}
|
||||
OB_INLINE int set_status_inited()
|
||||
@ -153,7 +153,7 @@ private:
|
||||
typedef common::ObLinkHashMap<table::ObTableLoadSegmentID, SegmentCtx> SegmentCtxMap;
|
||||
private:
|
||||
ObTableLoadObjectAllocator<ObTableLoadStoreTrans> trans_allocator_; // 多线程安全
|
||||
mutable lib::ObMutex mutex_;
|
||||
mutable obsys::ObRWLock rwlock_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
table::ObTableLoadStatusType status_;
|
||||
int error_code_;
|
||||
|
Reference in New Issue
Block a user