diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index aab8f84a21..07c39f6b01 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -242,7 +242,7 @@ int ObDASRef::execute_all_task() ObDasAggregatedTasks* aggregated_task = curr->get_obj(); if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *) ->execute_das_task(*this, *aggregated_task, async))) { - LOG_WARN("failed to execute aggregated das task", KR(ret)); + LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async)); } else { LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_); } @@ -389,25 +389,20 @@ int ObDASRef::process_remote_task_resp() DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) { const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp(); const common::ObSEArray &task_ops = curr->get_obj()->get_task_ops(); - if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) { - LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode())); + if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_err_code())) { + LOG_WARN("das async execution failed", K(task_resp)); for (int i = 0; i < task_ops.count(); i++) { get_exec_ctx().get_my_session()->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id()); - task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED); - task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode(); - if (OB_FAIL(task_ops.at(i)->state_advance())) { - LOG_WARN("failed to advance das task state", K(ret)); - } } - ret = COVER_SUCC(task_resp.get_rpc_rcode()); - if (OB_FAIL(ret)) { - save_ret = ret; - ret = OB_SUCCESS; - } - } else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) { + save_ret = task_resp.get_err_code(); + } + if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) { LOG_WARN("failed to process das async task resp", K(ret), K(task_resp)); save_ret = ret; ret = OB_SUCCESS; + } else { + // if task execute success, error must be success. + OB_ASSERT(OB_SUCCESS == task_resp.get_err_code()); } } async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory. @@ -426,7 +421,7 @@ int ObDASRef::move_local_tasks_to_last() if (aggregated_task->server_ == ctrl_addr) { if (!aggregated_tasks_.get_obj_list().move_to_last(curr)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to move local task to last", KR(ret), K(*aggregated_task)); + LOG_WARN("failed to move local task to last", KR(ret), KPC(aggregated_task)); } found_local_tasks = true; } diff --git a/src/sql/das/ob_das_rpc_processor.cpp b/src/sql/das/ob_das_rpc_processor.cpp index 831fa3f8ce..a9b2aee2b0 100644 --- a/src/sql/das/ob_das_rpc_processor.cpp +++ b/src/sql/das/ob_das_rpc_processor.cpp @@ -211,7 +211,8 @@ void ObRpcDasAsyncAccessCallBack::on_timeout() { int ret = OB_SUCCESS; LOG_WARN("das async task timeout", K(get_task_ops())); - result_.set_rpc_rcode(OB_TIMEOUT); + result_.set_err_code(OB_TIMEOUT); + result_.get_op_results().reuse(); context_->get_das_ref().inc_concurrency_limit_with_signal(); } @@ -220,7 +221,8 @@ void ObRpcDasAsyncAccessCallBack::on_invalid() int ret = OB_SUCCESS; // a valid packet on protocol level, but can't decode it. LOG_WARN("das async task invalid", K(get_task_ops())); - result_.set_rpc_rcode(OB_INVALID_ERROR); + result_.set_err_code(OB_INVALID_ERROR); + result_.get_op_results().reuse(); context_->get_das_ref().inc_concurrency_limit_with_signal(); } @@ -233,6 +235,12 @@ int ObRpcDasAsyncAccessCallBack::process() { int ret = OB_SUCCESS; LOG_DEBUG("DAS async access callback process", K_(result)); + if (OB_FAIL(get_rcode())) { + result_.set_err_code(get_rcode()); + // we need to clear op results because they are not decoded from das async rpc due to rpc error. + result_.get_op_results().reuse(); + LOG_WARN("das async rpc execution failed", K(get_rcode()), K_(result)); + } context_->get_das_ref().inc_concurrency_limit_with_signal(); return ret; } diff --git a/src/sql/das/ob_das_task.cpp b/src/sql/das/ob_das_task.cpp index ec5d4f9f64..79148b5700 100644 --- a/src/sql/das/ob_das_task.cpp +++ b/src/sql/das/ob_das_task.cpp @@ -354,8 +354,7 @@ ObDASTaskResp::ObDASTaskResp() op_results_(), rcode_(), trans_result_(), - das_factory_(nullptr), - rpc_rcode_(OB_SUCCESS) + das_factory_(nullptr) { } diff --git a/src/sql/das/ob_das_task.h b/src/sql/das/ob_das_task.h index afe87440ab..e042b45b56 100644 --- a/src/sql/das/ob_das_task.h +++ b/src/sql/das/ob_das_task.h @@ -365,8 +365,6 @@ public: transaction::ObTxExecResult &get_trans_result() { return trans_result_; } const transaction::ObTxExecResult &get_trans_result() const { return trans_result_; } void set_das_factory(ObDASTaskFactory *das_factory) { das_factory_ = das_factory; }; - void set_rpc_rcode(int rcode) { rpc_rcode_ = rcode; }; - int get_rpc_rcode() const { return rpc_rcode_; }; TO_STRING_KV(K_(has_more), K_(ctrl_svr), K_(runner_svr), @@ -381,7 +379,6 @@ private: obrpc::ObRpcResultCode rcode_; //返回的错误信息 transaction::ObTxExecResult trans_result_; ObDASTaskFactory *das_factory_; // no need to serialize - int rpc_rcode_; // store async remote rpc error code. no need to serialize }; template diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index 63f03a6dcd..fa17c65849 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -604,11 +604,13 @@ int ObDataAccessService::process_task_resp(ObDASRef &das_ref, const ObDASTaskRes } if (OB_FAIL(ret)) { // if error happened, it must be the last. + // Special case is rpc error, because we cannot do a partition retry on rpc error. + // So it's safe to only mark the first das task op failed. task_op->set_task_status(ObDasTaskStatus::FAILED); task_op->errcode_ = ret; int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(task_op->state_advance())) { - LOG_WARN("failed to advance das task state.",K(ret)); + LOG_WARN("failed to advance das task state.",K(tmp_ret)); } } else { // if no error happened, all tasks were executed successfully.