[OBCDC] fix handle_unserved_trans_ core
This commit is contained in:
@ -3603,6 +3603,7 @@ int PartTransTask::try_to_set_data_ready_status()
|
||||
int PartTransTask::handle_log_callback()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool can_be_reverted = false;
|
||||
|
||||
if (OB_UNLIKELY(is_sys_ls_part_trans())) {
|
||||
LOG_ERROR("Not a dml part is unexcepted", KPC(this));
|
||||
@ -3621,12 +3622,24 @@ int PartTransTask::handle_log_callback()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(handle_unserved_trans_())) {
|
||||
if (OB_FAIL(handle_unserved_trans_(can_be_reverted))) {
|
||||
LOG_ERROR("handle_unserved_trans_ fail", KR(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && can_be_reverted) {
|
||||
IObLogResourceCollector *resource_collector = TCTX.resource_collector_;
|
||||
if (OB_ISNULL(resource_collector)) {
|
||||
LOG_ERROR("resource_collector is NULL");
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(resource_collector->revert(this))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("revert PartTransTask fail", KR(ret));
|
||||
}
|
||||
} else {}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -3655,30 +3668,42 @@ int PartTransTask::check_dml_redo_node_ready_and_handle_()
|
||||
int PartTransTask::handle_unserved_trans()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool can_be_reverted = false;
|
||||
|
||||
// Ensure the correctness of concurrent processing of Storager and PartTransDispatcher
|
||||
ObByteLockGuard guard(data_ready_lock_);
|
||||
{
|
||||
ObByteLockGuard guard(data_ready_lock_);
|
||||
|
||||
// set unserved statue
|
||||
set_unserved_();
|
||||
// set unserved statue
|
||||
set_unserved_();
|
||||
|
||||
if (OB_FAIL(handle_unserved_trans_())) {
|
||||
LOG_ERROR("handle_unserved_trans_ fail", KR(ret), KPC(this));
|
||||
if (OB_FAIL(handle_unserved_trans_(can_be_reverted))) {
|
||||
LOG_ERROR("handle_unserved_trans_ fail", KR(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && can_be_reverted) {
|
||||
IObLogResourceCollector *resource_collector = TCTX.resource_collector_;
|
||||
if (OB_ISNULL(resource_collector)) {
|
||||
LOG_ERROR("resource_collector is NULL");
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(resource_collector->revert(this))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("revert PartTransTask fail", KR(ret));
|
||||
}
|
||||
} else {}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PartTransTask::handle_unserved_trans_()
|
||||
int PartTransTask::handle_unserved_trans_(bool &can_be_reverted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
IObLogResourceCollector *resource_collector = TCTX.resource_collector_;
|
||||
|
||||
if (OB_UNLIKELY(is_sys_ls_part_trans())) {
|
||||
LOG_ERROR("Not a dml part is unexcepted", KPC(this));
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
} else if (OB_ISNULL(resource_collector)) {
|
||||
LOG_ERROR("resource_collector is NULL");
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (is_data_ready()) {
|
||||
LOG_ERROR("data is already ready, not expected", KPC(this));
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -3686,11 +3711,10 @@ int PartTransTask::handle_unserved_trans_()
|
||||
if (OB_FAIL(check_dml_redo_node_ready_and_handle_())) {
|
||||
LOG_ERROR("check_dml_redo_node_ready_and_handle_ fail", KR(ret), KPC(this));
|
||||
} else if (is_data_ready()) {
|
||||
if (OB_FAIL(resource_collector->revert(this))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("revert PartTransTask fail", KR(ret));
|
||||
}
|
||||
}
|
||||
// In handle_unserved_trans_, the part_trans_task can be reverted by ResourceCollector
|
||||
// which is asynchronous process. If the asynchronous is fast enouth, the part_trans_task
|
||||
// may be destructed which leads to the unlock core of ObByteLockGuard.
|
||||
can_be_reverted = true;
|
||||
} else {}
|
||||
}
|
||||
|
||||
|
||||
@ -1343,7 +1343,7 @@ private:
|
||||
const int64_t data_len);
|
||||
|
||||
int check_dml_redo_node_ready_and_handle_();
|
||||
int handle_unserved_trans_();
|
||||
int handle_unserved_trans_(bool &can_be_reverted);
|
||||
void set_unserved_() { serve_state_ = UNSERVED; }
|
||||
bool is_data_ready() const { return ATOMIC_LOAD(&is_data_ready_); }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user