BUGFIX: break deadlock at memctx and part ctx
This commit is contained in:
parent
58875bd2b7
commit
3648a21c43
@ -1280,7 +1280,8 @@ int ObMemtableCtx::register_multi_source_data_if_need_(
|
||||
// TODO: yanyuan.cxf need seqno to do rollback.
|
||||
} else if (OB_FAIL(part_ctx->register_multi_data_source(type,
|
||||
buf,
|
||||
serialize_size))) {
|
||||
serialize_size,
|
||||
true /* try lock */))) {
|
||||
TRANS_LOG(WARN, "register to multi source data failed", K(ret));
|
||||
} else {
|
||||
// do nothing
|
||||
|
@ -5680,7 +5680,8 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
|
||||
|
||||
int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_source_type,
|
||||
const char *buf,
|
||||
const int64_t len)
|
||||
const int64_t len,
|
||||
const bool try_lock)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -5688,53 +5689,64 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
|
||||
ObString data;
|
||||
void *ptr = nullptr;
|
||||
ObTxBufferNodeArray tmp_array;
|
||||
bool need_lock = true;
|
||||
|
||||
CtxLockGuard guard(lock_);
|
||||
|
||||
if (OB_UNLIKELY(nullptr == buf || len <= 0 || data_source_type <= ObTxDataSourceType::UNKNOWN
|
||||
|| data_source_type >= ObTxDataSourceType::MAX_TYPE)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", KR(ret), K(data_source_type), KP(buf), K(len));
|
||||
} else if (is_follower_()) {
|
||||
ret = OB_NOT_MASTER;
|
||||
TRANS_LOG(WARN, "can not register mds on a follower", K(ret), K(data_source_type), K(len), KPC(this));
|
||||
} else if (is_committing_()) {
|
||||
ret = OB_TRANS_HAS_DECIDED;
|
||||
TRANS_LOG(WARN, "can not register mds in committing part_ctx", K(ret), KPC(this));
|
||||
} else if (OB_ISNULL(ptr = mtl_malloc(len, "MultiTxData"))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(data_source_type), K(len));
|
||||
} else if (FALSE_IT(MEMCPY(ptr, buf, len))) {
|
||||
} else if (FALSE_IT(data.assign_ptr(reinterpret_cast<char *>(ptr), len))) {
|
||||
if (try_lock) {
|
||||
// avoid deadlock, but give a timeout ts to avoid lock conflict short time.
|
||||
ret = lock_.lock(100000 /* 100ms */);
|
||||
// lock timeout need retry again outside.
|
||||
ret = OB_TIMEOUT == ret ? OB_EAGAIN : ret;
|
||||
need_lock = false;
|
||||
} else {
|
||||
if (OB_FAIL(node.init(data_source_type, data))) {
|
||||
TRANS_LOG(WARN, "init tx buffer node failed", KR(ret), K(data_source_type), K(*this));
|
||||
} else if (OB_FAIL(tmp_array.push_back(node))) {
|
||||
TRANS_LOG(WARN, "push back notify node failed", KR(ret));
|
||||
} else if (tmp_array.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size()));
|
||||
} else if (OB_FAIL(mds_cache_.insert_mds_node(node))) {
|
||||
TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type), K(*this));
|
||||
}
|
||||
// do nothing
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
CtxLockGuard guard(lock_, need_lock);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
mtl_free(ptr);
|
||||
} else if (OB_FAIL(notify_data_source_(NotifyType::REGISTER_SUCC, SCN(), false,
|
||||
tmp_array))) {
|
||||
if (OB_SUCCESS != (tmp_ret = mds_cache_.rollback_last_mds_node())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "rollback last mds node failed", K(tmp_ret), K(ret));
|
||||
if (OB_UNLIKELY(nullptr == buf || len <= 0 || data_source_type <= ObTxDataSourceType::UNKNOWN
|
||||
|| data_source_type >= ObTxDataSourceType::MAX_TYPE)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", KR(ret), K(data_source_type), KP(buf), K(len));
|
||||
} else if (is_follower_()) {
|
||||
ret = OB_NOT_MASTER;
|
||||
TRANS_LOG(WARN, "can not register mds on a follower", K(ret), K(data_source_type), K(len), KPC(this));
|
||||
} else if (is_committing_()) {
|
||||
ret = OB_TRANS_HAS_DECIDED;
|
||||
TRANS_LOG(WARN, "can not register mds in committing part_ctx", K(ret), KPC(this));
|
||||
} else if (OB_ISNULL(ptr = mtl_malloc(len, "MultiTxData"))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(data_source_type), K(len));
|
||||
} else if (FALSE_IT(MEMCPY(ptr, buf, len))) {
|
||||
} else if (FALSE_IT(data.assign_ptr(reinterpret_cast<char *>(ptr), len))) {
|
||||
} else {
|
||||
if (OB_FAIL(node.init(data_source_type, data))) {
|
||||
TRANS_LOG(WARN, "init tx buffer node failed", KR(ret), K(data_source_type), K(*this));
|
||||
} else if (OB_FAIL(tmp_array.push_back(node))) {
|
||||
TRANS_LOG(WARN, "push back notify node failed", KR(ret));
|
||||
} else if (tmp_array.get_serialize_size() > ObTxMultiDataSourceLog::MAX_MDS_LOG_SIZE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size()));
|
||||
} else if (OB_FAIL(mds_cache_.insert_mds_node(node))) {
|
||||
TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type), K(*this));
|
||||
}
|
||||
|
||||
TRANS_LOG(WARN, "notify data source for register_succ failed", K(tmp_ret));
|
||||
} else if (mds_cache_.get_unsubmitted_size() < ObTxMultiDataSourceLog::MAX_PENDING_BUF_SIZE) {
|
||||
// do nothing
|
||||
} else if (OB_SUCCESS != (tmp_ret = submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) {
|
||||
TRANS_LOG(WARN, "submit mds log failed", K(tmp_ret));
|
||||
if (OB_FAIL(ret)) {
|
||||
mtl_free(ptr);
|
||||
} else if (OB_FAIL(notify_data_source_(NotifyType::REGISTER_SUCC, SCN(), false,
|
||||
tmp_array))) {
|
||||
if (OB_SUCCESS != (tmp_ret = mds_cache_.rollback_last_mds_node())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "rollback last mds node failed", K(tmp_ret), K(ret));
|
||||
}
|
||||
|
||||
TRANS_LOG(WARN, "notify data source for register_succ failed", K(tmp_ret));
|
||||
} else if (mds_cache_.get_unsubmitted_size() < ObTxMultiDataSourceLog::MAX_PENDING_BUF_SIZE) {
|
||||
// do nothing
|
||||
} else if (OB_SUCCESS != (tmp_ret = submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) {
|
||||
TRANS_LOG(WARN, "submit mds log failed", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN,
|
||||
"register MDS redo in part_ctx failed",
|
||||
|
@ -378,7 +378,10 @@ public:
|
||||
role_state_ = for_replay ? TxCtxRoleState::FOLLOWER : TxCtxRoleState::LEADER;
|
||||
}
|
||||
|
||||
int register_multi_data_source(const ObTxDataSourceType type, const char *buf, const int64_t len);
|
||||
int register_multi_data_source(const ObTxDataSourceType type,
|
||||
const char *buf,
|
||||
const int64_t len,
|
||||
const bool try_lock = false);
|
||||
|
||||
const share::SCN get_start_log_ts()
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user