diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index 3c04615e13..11b7ebd8db 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -1119,21 +1119,28 @@ void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext &gctx, retry_err_code_ = client_ret; } if (RETRY_TYPE_NONE != retry_type_) { - result.set_close_fail_callback([this](const int err)-> void { this->on_close_resultset_fail_(err); }); + result.set_close_fail_callback([this](const int err, int &client_ret)-> void { this->on_close_resultset_fail_(err, client_ret); }); } } -void ObQueryRetryCtrl::on_close_resultset_fail_(const int err) +void ObQueryRetryCtrl::on_close_resultset_fail_(const int err, int &client_ret) { // some unretryable error happened in close result set phase if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) { // the txn relative error in close stmt + // thses error will cause the txn must to be rollbacked + // and can not accept new request any more, so if retry + // current stmt, it must be failed, hence we cancel retry if (OB_TRANS_NEED_ROLLBACK == err || OB_TRANS_INVALID_STATE == err || OB_TRANS_HAS_DECIDED == err) { retry_type_ = RETRY_TYPE_NONE; // also clear the packet retry THIS_WORKER.unset_need_retry(); + // rewrite the client error code + // when decide to cancel the retry, return an unretryable error + // is better, because it won't leak the internal error to user + client_ret = err; } } } diff --git a/src/observer/mysql/ob_query_retry_ctrl.h b/src/observer/mysql/ob_query_retry_ctrl.h index 71157aef23..ff5ce7ca77 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.h +++ b/src/observer/mysql/ob_query_retry_ctrl.h @@ -313,7 +313,7 @@ private: static void inner_location_error_proc(ObRetryParam &v); static void inner_location_error_nothing_readable_proc(ObRetryParam &v); static void inner_peer_server_status_uncertain_proc(ObRetryParam &v); - void on_close_resultset_fail_(const int err); + void on_close_resultset_fail_(const int err, int &client_ret); /* variables */ // map_ is used to fast lookup the error code retry processor diff --git a/src/observer/mysql/ob_sync_plan_driver.cpp b/src/observer/mysql/ob_sync_plan_driver.cpp index 576380bdc1..0fbbf3d5ae 100644 --- a/src/observer/mysql/ob_sync_plan_driver.cpp +++ b/src/observer/mysql/ob_sync_plan_driver.cpp @@ -82,7 +82,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result) K(ret), K(cli_ret), K(retry_ctrl_.need_retry())); } } - cret = result.close(); + cret = result.close(cli_ret); if (cret != OB_SUCCESS && cret != OB_TRANSACTION_SET_VIOLATION && OB_TRY_LOCK_ROW_CONFLICT != cret) { @@ -116,7 +116,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result) } else { result.refresh_location_cache(true, ret); } - int cret = result.close(); + int cret = result.close(ret); if (cret != OB_SUCCESS) { LOG_WARN("close result set fail", K(cret)); } diff --git a/src/sql/executor/ob_executor_rpc_impl.cpp b/src/sql/executor/ob_executor_rpc_impl.cpp index f7942035b8..b8299edabd 100644 --- a/src/sql/executor/ob_executor_rpc_impl.cpp +++ b/src/sql/executor/ob_executor_rpc_impl.cpp @@ -87,6 +87,10 @@ int ObExecutorRpcImpl::task_execute(ObExecutorRpcCtx &rpc_ctx, const obrpc::ObRpcResultCode &rcode = to_proxy.get_result_code(); if (OB_LIKELY(OB_SUCCESS != rcode.rcode_)) { FORWARD_USER_ERROR(rcode.rcode_, rcode.msg_); + } else if (OB_RPC_SEND_ERROR == ret || OB_RPC_POST_ERROR == ret) { + // these two error means the request hasn't been sent out to network + // either because the server is in blacklist or network link breaks + has_sent_task = false; } else { has_transfer_err = true; } diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 31047e1a29..6f292b69da 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -780,7 +780,7 @@ OB_INLINE int ObResultSet::do_close_plan(int errcode, ObExecContext &ctx) return ret; } -int ObResultSet::close() +int ObResultSet::close(int &client_ret) { int ret = OB_SUCCESS; LinkExecCtxGuard link_guard(my_session_, get_exec_context()); @@ -896,7 +896,7 @@ int ObResultSet::close() // notify close fail to listener int err = OB_SUCCESS != do_close_plan_ret ? do_close_plan_ret : ret; if (OB_SUCCESS != err && err != errcode_ && close_fail_cb_.is_valid()) { - close_fail_cb_(err); + close_fail_cb_(err, client_ret); } //NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret, //OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async); diff --git a/src/sql/ob_result_set.h b/src/sql/ob_result_set.h index 18aa68533a..ad3e0ee5c1 100644 --- a/src/sql/ob_result_set.h +++ b/src/sql/ob_result_set.h @@ -120,7 +120,9 @@ public: /// @return OB_ITER_END when no more data available int get_next_row(const common::ObNewRow *&row); /// close the result set after get all the rows - int close(); + int close() { int unused = 0; return close(unused); } + // close result set and rewrite the client ret + int close(int &client_ret); /// get number of rows affected by INSERT/UPDATE/DELETE int64_t get_affected_rows() const; int64_t get_return_rows() const { return return_rows_; } @@ -321,7 +323,7 @@ public: static void replace_lob_type(const ObSQLSessionInfo &session, const ObField &field, obmysql::ObMySQLField &mfield); - void set_close_fail_callback(ObFunction func) { close_fail_cb_ = func; } + void set_close_fail_callback(ObFunction func) { close_fail_cb_ = func; } private: // types and constants static const int64_t TRANSACTION_SET_VIOLATION_MAX_RETRY = 3; @@ -427,7 +429,7 @@ private: common::ObString ps_sql_; // for sql in pl bool is_init_; common::ParamStore ps_params_; // 文本 ps params 记录,用于填入 sql_audit - common::ObFunction close_fail_cb_; + common::ObFunction close_fail_cb_; };