bug fix: das aysnc rpc didn't process all resp when error happened.
This commit is contained in:
@ -385,27 +385,37 @@ void ObDASRef::remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb)
|
|||||||
int ObDASRef::process_remote_task_resp()
|
int ObDASRef::process_remote_task_resp()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
int save_ret = OB_SUCCESS;
|
||||||
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) {
|
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) {
|
||||||
const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp();
|
const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp();
|
||||||
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = curr->get_obj()->get_task_ops();
|
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = curr->get_obj()->get_task_ops();
|
||||||
if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) {
|
if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) {
|
||||||
LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode()));
|
LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode()));
|
||||||
for (int i = 0; i < task_ops.count(); i++) {
|
for (int i = 0; i < task_ops.count(); i++) {
|
||||||
|
get_exec_ctx().get_my_session()->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id());
|
||||||
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
|
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
|
||||||
task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode();
|
task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode();
|
||||||
if (OB_FAIL(task_ops.at(i)->state_advance())) {
|
if (OB_FAIL(task_ops.at(i)->state_advance())) {
|
||||||
LOG_WARN("failed to advance das task state", K(ret));
|
LOG_WARN("failed to advance das task state", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ret = task_resp.get_rpc_rcode();
|
ret = COVER_SUCC(task_resp.get_rpc_rcode());
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
save_ret = ret;
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
} else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) {
|
} else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) {
|
||||||
LOG_WARN("failed to process das async task resp", K(ret), K(task_resp));
|
LOG_WARN("failed to process das async task resp", K(ret), K(task_resp));
|
||||||
|
save_ret = ret;
|
||||||
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory.
|
async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory.
|
||||||
|
ret = COVER_SUCC(save_ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int ObDASRef::move_local_tasks_to_last()
|
int ObDASRef::move_local_tasks_to_last()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
Reference in New Issue
Block a user