bugfix: check rpc error code when process das rpc callback.
This commit is contained in:
@ -242,7 +242,7 @@ int ObDASRef::execute_all_task()
|
|||||||
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
|
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
|
||||||
if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *)
|
if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *)
|
||||||
->execute_das_task(*this, *aggregated_task, async))) {
|
->execute_das_task(*this, *aggregated_task, async))) {
|
||||||
LOG_WARN("failed to execute aggregated das task", KR(ret));
|
LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async));
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
|
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
|
||||||
}
|
}
|
||||||
@ -389,25 +389,20 @@ int ObDASRef::process_remote_task_resp()
|
|||||||
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) {
|
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) {
|
||||||
const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp();
|
const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp();
|
||||||
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = curr->get_obj()->get_task_ops();
|
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = curr->get_obj()->get_task_ops();
|
||||||
if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) {
|
if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_err_code())) {
|
||||||
LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode()));
|
LOG_WARN("das async execution failed", K(task_resp));
|
||||||
for (int i = 0; i < task_ops.count(); i++) {
|
for (int i = 0; i < task_ops.count(); i++) {
|
||||||
get_exec_ctx().get_my_session()->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id());
|
get_exec_ctx().get_my_session()->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id());
|
||||||
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
|
|
||||||
task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode();
|
|
||||||
if (OB_FAIL(task_ops.at(i)->state_advance())) {
|
|
||||||
LOG_WARN("failed to advance das task state", K(ret));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ret = COVER_SUCC(task_resp.get_rpc_rcode());
|
save_ret = task_resp.get_err_code();
|
||||||
if (OB_FAIL(ret)) {
|
}
|
||||||
save_ret = ret;
|
if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) {
|
||||||
ret = OB_SUCCESS;
|
|
||||||
}
|
|
||||||
} else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) {
|
|
||||||
LOG_WARN("failed to process das async task resp", K(ret), K(task_resp));
|
LOG_WARN("failed to process das async task resp", K(ret), K(task_resp));
|
||||||
save_ret = ret;
|
save_ret = ret;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
} else {
|
||||||
|
// if task execute success, error must be success.
|
||||||
|
OB_ASSERT(OB_SUCCESS == task_resp.get_err_code());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory.
|
async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory.
|
||||||
@ -426,7 +421,7 @@ int ObDASRef::move_local_tasks_to_last()
|
|||||||
if (aggregated_task->server_ == ctrl_addr) {
|
if (aggregated_task->server_ == ctrl_addr) {
|
||||||
if (!aggregated_tasks_.get_obj_list().move_to_last(curr)) {
|
if (!aggregated_tasks_.get_obj_list().move_to_last(curr)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("failed to move local task to last", KR(ret), K(*aggregated_task));
|
LOG_WARN("failed to move local task to last", KR(ret), KPC(aggregated_task));
|
||||||
}
|
}
|
||||||
found_local_tasks = true;
|
found_local_tasks = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -211,7 +211,8 @@ void ObRpcDasAsyncAccessCallBack::on_timeout()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
LOG_WARN("das async task timeout", K(get_task_ops()));
|
LOG_WARN("das async task timeout", K(get_task_ops()));
|
||||||
result_.set_rpc_rcode(OB_TIMEOUT);
|
result_.set_err_code(OB_TIMEOUT);
|
||||||
|
result_.get_op_results().reuse();
|
||||||
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -220,7 +221,8 @@ void ObRpcDasAsyncAccessCallBack::on_invalid()
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
// a valid packet on protocol level, but can't decode it.
|
// a valid packet on protocol level, but can't decode it.
|
||||||
LOG_WARN("das async task invalid", K(get_task_ops()));
|
LOG_WARN("das async task invalid", K(get_task_ops()));
|
||||||
result_.set_rpc_rcode(OB_INVALID_ERROR);
|
result_.set_err_code(OB_INVALID_ERROR);
|
||||||
|
result_.get_op_results().reuse();
|
||||||
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,6 +235,12 @@ int ObRpcDasAsyncAccessCallBack::process()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
LOG_DEBUG("DAS async access callback process", K_(result));
|
LOG_DEBUG("DAS async access callback process", K_(result));
|
||||||
|
if (OB_FAIL(get_rcode())) {
|
||||||
|
result_.set_err_code(get_rcode());
|
||||||
|
// we need to clear op results because they are not decoded from das async rpc due to rpc error.
|
||||||
|
result_.get_op_results().reuse();
|
||||||
|
LOG_WARN("das async rpc execution failed", K(get_rcode()), K_(result));
|
||||||
|
}
|
||||||
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
context_->get_das_ref().inc_concurrency_limit_with_signal();
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -354,8 +354,7 @@ ObDASTaskResp::ObDASTaskResp()
|
|||||||
op_results_(),
|
op_results_(),
|
||||||
rcode_(),
|
rcode_(),
|
||||||
trans_result_(),
|
trans_result_(),
|
||||||
das_factory_(nullptr),
|
das_factory_(nullptr)
|
||||||
rpc_rcode_(OB_SUCCESS)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -365,8 +365,6 @@ public:
|
|||||||
transaction::ObTxExecResult &get_trans_result() { return trans_result_; }
|
transaction::ObTxExecResult &get_trans_result() { return trans_result_; }
|
||||||
const transaction::ObTxExecResult &get_trans_result() const { return trans_result_; }
|
const transaction::ObTxExecResult &get_trans_result() const { return trans_result_; }
|
||||||
void set_das_factory(ObDASTaskFactory *das_factory) { das_factory_ = das_factory; };
|
void set_das_factory(ObDASTaskFactory *das_factory) { das_factory_ = das_factory; };
|
||||||
void set_rpc_rcode(int rcode) { rpc_rcode_ = rcode; };
|
|
||||||
int get_rpc_rcode() const { return rpc_rcode_; };
|
|
||||||
TO_STRING_KV(K_(has_more),
|
TO_STRING_KV(K_(has_more),
|
||||||
K_(ctrl_svr),
|
K_(ctrl_svr),
|
||||||
K_(runner_svr),
|
K_(runner_svr),
|
||||||
@ -381,7 +379,6 @@ private:
|
|||||||
obrpc::ObRpcResultCode rcode_; //返回的错误信息
|
obrpc::ObRpcResultCode rcode_; //返回的错误信息
|
||||||
transaction::ObTxExecResult trans_result_;
|
transaction::ObTxExecResult trans_result_;
|
||||||
ObDASTaskFactory *das_factory_; // no need to serialize
|
ObDASTaskFactory *das_factory_; // no need to serialize
|
||||||
int rpc_rcode_; // store async remote rpc error code. no need to serialize
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
|
|||||||
@ -604,11 +604,13 @@ int ObDataAccessService::process_task_resp(ObDASRef &das_ref, const ObDASTaskRes
|
|||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
// if error happened, it must be the last.
|
// if error happened, it must be the last.
|
||||||
|
// Special case is rpc error, because we cannot do a partition retry on rpc error.
|
||||||
|
// So it's safe to only mark the first das task op failed.
|
||||||
task_op->set_task_status(ObDasTaskStatus::FAILED);
|
task_op->set_task_status(ObDasTaskStatus::FAILED);
|
||||||
task_op->errcode_ = ret;
|
task_op->errcode_ = ret;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_TMP_FAIL(task_op->state_advance())) {
|
if (OB_TMP_FAIL(task_op->state_advance())) {
|
||||||
LOG_WARN("failed to advance das task state.",K(ret));
|
LOG_WARN("failed to advance das task state.",K(tmp_ret));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if no error happened, all tasks were executed successfully.
|
// if no error happened, all tasks were executed successfully.
|
||||||
|
|||||||
Reference in New Issue
Block a user