fix lock contetion of direct load
This commit is contained in:
@ -106,7 +106,7 @@ int ObTableLoadCoordinatorTrans::get_bucket_writer_for_write(
|
|||||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
|
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
|
||||||
LOG_WARN("fail to check trans status", KR(ret));
|
LOG_WARN("fail to check trans status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_bucket_writer_)) {
|
if (OB_ISNULL(trans_bucket_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null bucket writer", KR(ret));
|
LOG_WARN("unexpected null bucket writer", KR(ret));
|
||||||
@ -132,7 +132,7 @@ int ObTableLoadCoordinatorTrans::get_bucket_writer_for_flush(
|
|||||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
||||||
LOG_WARN("fail to check trans status", KR(ret));
|
LOG_WARN("fail to check trans status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_bucket_writer_)) {
|
if (OB_ISNULL(trans_bucket_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null bucket writer", KR(ret));
|
LOG_WARN("unexpected null bucket writer", KR(ret));
|
||||||
@ -158,7 +158,7 @@ void ObTableLoadCoordinatorTrans::put_bucket_writer(ObTableLoadTransBucketWriter
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid null bucket writer", KR(ret));
|
LOG_WARN("invalid null bucket writer", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
OB_ASSERT(trans_bucket_writer_ == bucket_writer);
|
OB_ASSERT(trans_bucket_writer_ == bucket_writer);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
|||||||
@ -117,7 +117,7 @@ int ObTableLoadStoreTrans::get_store_writer_for_write(
|
|||||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
|
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
|
||||||
LOG_WARN("fail to check trans status", KR(ret));
|
LOG_WARN("fail to check trans status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_store_writer_)) {
|
if (OB_ISNULL(trans_store_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null store writer", KR(ret));
|
LOG_WARN("unexpected null store writer", KR(ret));
|
||||||
@ -143,7 +143,7 @@ int ObTableLoadStoreTrans::get_store_writer_for_flush(
|
|||||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
|
||||||
LOG_WARN("fail to check trans status", KR(ret));
|
LOG_WARN("fail to check trans status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_store_writer_)) {
|
if (OB_ISNULL(trans_store_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null store writer", KR(ret));
|
LOG_WARN("unexpected null store writer", KR(ret));
|
||||||
@ -170,7 +170,7 @@ int ObTableLoadStoreTrans::get_store_writer_for_clean_up(
|
|||||||
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::ABORT))) {
|
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::ABORT))) {
|
||||||
LOG_WARN("fail to check trans status", KR(ret));
|
LOG_WARN("fail to check trans status", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_store_writer_)) {
|
if (OB_ISNULL(trans_store_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null store writer", KR(ret));
|
LOG_WARN("unexpected null store writer", KR(ret));
|
||||||
@ -192,7 +192,7 @@ void ObTableLoadStoreTrans::put_store_writer(ObTableLoadTransStoreWriter *store_
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid null store", KR(ret));
|
LOG_WARN("invalid null store", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
|
||||||
OB_ASSERT(trans_store_writer_ == store_writer);
|
OB_ASSERT(trans_store_writer_ == store_writer);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
@ -224,7 +224,7 @@ int ObTableLoadStoreTrans::output_store(ObTableLoadTransStore *&trans_store)
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this));
|
LOG_WARN("ObTableLoadStoreTrans not init", KR(ret), KP(this));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(trans_ctx_->mutex_);
|
obsys::ObWLockGuard guard(trans_ctx_->rwlock_);
|
||||||
if (OB_ISNULL(trans_store_) || OB_ISNULL(trans_store_writer_)) {
|
if (OB_ISNULL(trans_store_) || OB_ISNULL(trans_store_writer_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected null store", KR(ret), KP_(trans_store), KP_(trans_store_writer));
|
LOG_WARN("unexpected null store", KR(ret), KP_(trans_store), KP_(trans_store_writer));
|
||||||
|
|||||||
@ -33,7 +33,7 @@ int ObTableLoadTransCtx::advance_trans_status(ObTableLoadTransStatusType trans_s
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid args", KR(ret), K(trans_status));
|
LOG_WARN("invalid args", KR(ret), K(trans_status));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(mutex_);
|
obsys::ObWLockGuard guard(rwlock_);
|
||||||
if (OB_UNLIKELY(ObTableLoadTransStatusType::ERROR == trans_status_)) {
|
if (OB_UNLIKELY(ObTableLoadTransStatusType::ERROR == trans_status_)) {
|
||||||
ret = error_code_;
|
ret = error_code_;
|
||||||
LOG_WARN("trans has error", KR(ret));
|
LOG_WARN("trans has error", KR(ret));
|
||||||
@ -60,7 +60,7 @@ int ObTableLoadTransCtx::set_trans_status_error(int error_code)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid args", KR(ret), K(error_code));
|
LOG_WARN("invalid args", KR(ret), K(error_code));
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuard guard(mutex_);
|
obsys::ObWLockGuard guard(rwlock_);
|
||||||
if (OB_UNLIKELY(trans_status_ == ObTableLoadTransStatusType::ABORT)) {
|
if (OB_UNLIKELY(trans_status_ == ObTableLoadTransStatusType::ABORT)) {
|
||||||
ret = OB_TRANS_KILLED;
|
ret = OB_TRANS_KILLED;
|
||||||
} else if (trans_status_ != ObTableLoadTransStatusType::ERROR) {
|
} else if (trans_status_ != ObTableLoadTransStatusType::ERROR) {
|
||||||
@ -74,7 +74,7 @@ int ObTableLoadTransCtx::set_trans_status_error(int error_code)
|
|||||||
int ObTableLoadTransCtx::set_trans_status_abort()
|
int ObTableLoadTransCtx::set_trans_status_abort()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMutexGuard guard(mutex_);
|
obsys::ObWLockGuard guard(rwlock_);
|
||||||
trans_status_ = ObTableLoadTransStatusType::ABORT;
|
trans_status_ = ObTableLoadTransStatusType::ABORT;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -82,7 +82,7 @@ int ObTableLoadTransCtx::set_trans_status_abort()
|
|||||||
int ObTableLoadTransCtx::check_trans_status(ObTableLoadTransStatusType trans_status) const
|
int ObTableLoadTransCtx::check_trans_status(ObTableLoadTransStatusType trans_status) const
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMutexGuard guard(mutex_);
|
obsys::ObRLockGuard guard(rwlock_);
|
||||||
if (OB_UNLIKELY(trans_status != trans_status_)) {
|
if (OB_UNLIKELY(trans_status != trans_status_)) {
|
||||||
if (ObTableLoadTransStatusType::ERROR == trans_status_) {
|
if (ObTableLoadTransStatusType::ERROR == trans_status_) {
|
||||||
ret = error_code_;
|
ret = error_code_;
|
||||||
|
|||||||
@ -20,12 +20,12 @@ public:
|
|||||||
ObTableLoadTransCtx(ObTableLoadTableCtx *ctx, const table::ObTableLoadTransId &trans_id);
|
ObTableLoadTransCtx(ObTableLoadTableCtx *ctx, const table::ObTableLoadTransId &trans_id);
|
||||||
OB_INLINE table::ObTableLoadTransStatusType get_trans_status() const
|
OB_INLINE table::ObTableLoadTransStatusType get_trans_status() const
|
||||||
{
|
{
|
||||||
lib::ObMutexGuard guard(mutex_);
|
obsys::ObRLockGuard guard(rwlock_);
|
||||||
return trans_status_;
|
return trans_status_;
|
||||||
}
|
}
|
||||||
OB_INLINE int get_error_code() const
|
OB_INLINE int get_error_code() const
|
||||||
{
|
{
|
||||||
lib::ObMutexGuard guard(mutex_);
|
obsys::ObRLockGuard guard(rwlock_);
|
||||||
return error_code_;
|
return error_code_;
|
||||||
}
|
}
|
||||||
int advance_trans_status(table::ObTableLoadTransStatusType trans_status);
|
int advance_trans_status(table::ObTableLoadTransStatusType trans_status);
|
||||||
@ -36,7 +36,7 @@ public:
|
|||||||
public:
|
public:
|
||||||
ObTableLoadTableCtx * const ctx_;
|
ObTableLoadTableCtx * const ctx_;
|
||||||
const table::ObTableLoadTransId trans_id_;
|
const table::ObTableLoadTransId trans_id_;
|
||||||
mutable lib::ObMutex mutex_;
|
mutable obsys::ObRWLock rwlock_;
|
||||||
common::ObArenaAllocator allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
table::ObTableLoadTransStatusType trans_status_;
|
table::ObTableLoadTransStatusType trans_status_;
|
||||||
int error_code_;
|
int error_code_;
|
||||||
|
|||||||
Reference in New Issue
Block a user