From f2648703a37170b1c1301b9c12b93fc9af8b094e Mon Sep 17 00:00:00 2001 From: rolandqi Date: Sat, 28 Jan 2023 16:24:27 +0800 Subject: [PATCH] bug fix: das aysnc rpc didn't process all resp when error happened. --- src/sql/das/ob_das_ref.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index 6562856792..aab8f84a21 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -385,27 +385,37 @@ void ObDASRef::remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb) int ObDASRef::process_remote_task_resp() { int ret = OB_SUCCESS; + int save_ret = OB_SUCCESS; DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) { const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp(); const common::ObSEArray &task_ops = curr->get_obj()->get_task_ops(); if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) { LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode())); for (int i = 0; i < task_ops.count(); i++) { + get_exec_ctx().get_my_session()->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id()); task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED); task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode(); if (OB_FAIL(task_ops.at(i)->state_advance())) { LOG_WARN("failed to advance das task state", K(ret)); } } - ret = task_resp.get_rpc_rcode(); + ret = COVER_SUCC(task_resp.get_rpc_rcode()); + if (OB_FAIL(ret)) { + save_ret = ret; + ret = OB_SUCCESS; + } } else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) { LOG_WARN("failed to process das async task resp", K(ret), K(task_resp)); + save_ret = ret; + ret = OB_SUCCESS; } } async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory. + ret = COVER_SUCC(save_ret); return ret; } + int ObDASRef::move_local_tasks_to_last() { int ret = OB_SUCCESS;