[master] return end-stmt failure when retry is canceled
This commit is contained in:
@ -1119,21 +1119,28 @@ void ObQueryRetryCtrl::test_and_save_retry_state(const ObGlobalContext &gctx,
|
|||||||
retry_err_code_ = client_ret;
|
retry_err_code_ = client_ret;
|
||||||
}
|
}
|
||||||
if (RETRY_TYPE_NONE != retry_type_) {
|
if (RETRY_TYPE_NONE != retry_type_) {
|
||||||
result.set_close_fail_callback([this](const int err)-> void { this->on_close_resultset_fail_(err); });
|
result.set_close_fail_callback([this](const int err, int &client_ret)-> void { this->on_close_resultset_fail_(err, client_ret); });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObQueryRetryCtrl::on_close_resultset_fail_(const int err)
|
void ObQueryRetryCtrl::on_close_resultset_fail_(const int err, int &client_ret)
|
||||||
{
|
{
|
||||||
// some unretryable error happened in close result set phase
|
// some unretryable error happened in close result set phase
|
||||||
if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) {
|
if (OB_SUCCESS != err && RETRY_TYPE_NONE != retry_type_) {
|
||||||
// the txn relative error in close stmt
|
// the txn relative error in close stmt
|
||||||
|
// thses error will cause the txn must to be rollbacked
|
||||||
|
// and can not accept new request any more, so if retry
|
||||||
|
// current stmt, it must be failed, hence we cancel retry
|
||||||
if (OB_TRANS_NEED_ROLLBACK == err ||
|
if (OB_TRANS_NEED_ROLLBACK == err ||
|
||||||
OB_TRANS_INVALID_STATE == err ||
|
OB_TRANS_INVALID_STATE == err ||
|
||||||
OB_TRANS_HAS_DECIDED == err) {
|
OB_TRANS_HAS_DECIDED == err) {
|
||||||
retry_type_ = RETRY_TYPE_NONE;
|
retry_type_ = RETRY_TYPE_NONE;
|
||||||
// also clear the packet retry
|
// also clear the packet retry
|
||||||
THIS_WORKER.unset_need_retry();
|
THIS_WORKER.unset_need_retry();
|
||||||
|
// rewrite the client error code
|
||||||
|
// when decide to cancel the retry, return an unretryable error
|
||||||
|
// is better, because it won't leak the internal error to user
|
||||||
|
client_ret = err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -313,7 +313,7 @@ private:
|
|||||||
static void inner_location_error_proc(ObRetryParam &v);
|
static void inner_location_error_proc(ObRetryParam &v);
|
||||||
static void inner_location_error_nothing_readable_proc(ObRetryParam &v);
|
static void inner_location_error_nothing_readable_proc(ObRetryParam &v);
|
||||||
static void inner_peer_server_status_uncertain_proc(ObRetryParam &v);
|
static void inner_peer_server_status_uncertain_proc(ObRetryParam &v);
|
||||||
void on_close_resultset_fail_(const int err);
|
void on_close_resultset_fail_(const int err, int &client_ret);
|
||||||
|
|
||||||
/* variables */
|
/* variables */
|
||||||
// map_ is used to fast lookup the error code retry processor
|
// map_ is used to fast lookup the error code retry processor
|
||||||
|
|||||||
@ -82,7 +82,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result)
|
|||||||
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
|
K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cret = result.close();
|
cret = result.close(cli_ret);
|
||||||
if (cret != OB_SUCCESS &&
|
if (cret != OB_SUCCESS &&
|
||||||
cret != OB_TRANSACTION_SET_VIOLATION &&
|
cret != OB_TRANSACTION_SET_VIOLATION &&
|
||||||
OB_TRY_LOCK_ROW_CONFLICT != cret) {
|
OB_TRY_LOCK_ROW_CONFLICT != cret) {
|
||||||
@ -116,7 +116,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result)
|
|||||||
} else {
|
} else {
|
||||||
result.refresh_location_cache(true, ret);
|
result.refresh_location_cache(true, ret);
|
||||||
}
|
}
|
||||||
int cret = result.close();
|
int cret = result.close(ret);
|
||||||
if (cret != OB_SUCCESS) {
|
if (cret != OB_SUCCESS) {
|
||||||
LOG_WARN("close result set fail", K(cret));
|
LOG_WARN("close result set fail", K(cret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -87,6 +87,10 @@ int ObExecutorRpcImpl::task_execute(ObExecutorRpcCtx &rpc_ctx,
|
|||||||
const obrpc::ObRpcResultCode &rcode = to_proxy.get_result_code();
|
const obrpc::ObRpcResultCode &rcode = to_proxy.get_result_code();
|
||||||
if (OB_LIKELY(OB_SUCCESS != rcode.rcode_)) {
|
if (OB_LIKELY(OB_SUCCESS != rcode.rcode_)) {
|
||||||
FORWARD_USER_ERROR(rcode.rcode_, rcode.msg_);
|
FORWARD_USER_ERROR(rcode.rcode_, rcode.msg_);
|
||||||
|
} else if (OB_RPC_SEND_ERROR == ret || OB_RPC_POST_ERROR == ret) {
|
||||||
|
// these two error means the request hasn't been sent out to network
|
||||||
|
// either because the server is in blacklist or network link breaks
|
||||||
|
has_sent_task = false;
|
||||||
} else {
|
} else {
|
||||||
has_transfer_err = true;
|
has_transfer_err = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -780,7 +780,7 @@ OB_INLINE int ObResultSet::do_close_plan(int errcode, ObExecContext &ctx)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObResultSet::close()
|
int ObResultSet::close(int &client_ret)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
LinkExecCtxGuard link_guard(my_session_, get_exec_context());
|
LinkExecCtxGuard link_guard(my_session_, get_exec_context());
|
||||||
@ -896,7 +896,7 @@ int ObResultSet::close()
|
|||||||
// notify close fail to listener
|
// notify close fail to listener
|
||||||
int err = OB_SUCCESS != do_close_plan_ret ? do_close_plan_ret : ret;
|
int err = OB_SUCCESS != do_close_plan_ret ? do_close_plan_ret : ret;
|
||||||
if (OB_SUCCESS != err && err != errcode_ && close_fail_cb_.is_valid()) {
|
if (OB_SUCCESS != err && err != errcode_ && close_fail_cb_.is_valid()) {
|
||||||
close_fail_cb_(err);
|
close_fail_cb_(err, client_ret);
|
||||||
}
|
}
|
||||||
//NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret,
|
//NG_TRACE_EXT(result_set_close, OB_ID(ret), ret, OB_ID(arg1), prev_ret,
|
||||||
//OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async);
|
//OB_ID(arg2), ins_ret, OB_ID(arg3), errcode_, OB_ID(async), async);
|
||||||
|
|||||||
@ -120,7 +120,9 @@ public:
|
|||||||
/// @return OB_ITER_END when no more data available
|
/// @return OB_ITER_END when no more data available
|
||||||
int get_next_row(const common::ObNewRow *&row);
|
int get_next_row(const common::ObNewRow *&row);
|
||||||
/// close the result set after get all the rows
|
/// close the result set after get all the rows
|
||||||
int close();
|
int close() { int unused = 0; return close(unused); }
|
||||||
|
// close result set and rewrite the client ret
|
||||||
|
int close(int &client_ret);
|
||||||
/// get number of rows affected by INSERT/UPDATE/DELETE
|
/// get number of rows affected by INSERT/UPDATE/DELETE
|
||||||
int64_t get_affected_rows() const;
|
int64_t get_affected_rows() const;
|
||||||
int64_t get_return_rows() const { return return_rows_; }
|
int64_t get_return_rows() const { return return_rows_; }
|
||||||
@ -321,7 +323,7 @@ public:
|
|||||||
static void replace_lob_type(const ObSQLSessionInfo &session,
|
static void replace_lob_type(const ObSQLSessionInfo &session,
|
||||||
const ObField &field,
|
const ObField &field,
|
||||||
obmysql::ObMySQLField &mfield);
|
obmysql::ObMySQLField &mfield);
|
||||||
void set_close_fail_callback(ObFunction<void(const int)> func) { close_fail_cb_ = func; }
|
void set_close_fail_callback(ObFunction<void(const int, int&)> func) { close_fail_cb_ = func; }
|
||||||
private:
|
private:
|
||||||
// types and constants
|
// types and constants
|
||||||
static const int64_t TRANSACTION_SET_VIOLATION_MAX_RETRY = 3;
|
static const int64_t TRANSACTION_SET_VIOLATION_MAX_RETRY = 3;
|
||||||
@ -427,7 +429,7 @@ private:
|
|||||||
common::ObString ps_sql_; // for sql in pl
|
common::ObString ps_sql_; // for sql in pl
|
||||||
bool is_init_;
|
bool is_init_;
|
||||||
common::ParamStore ps_params_; // 文本 ps params 记录,用于填入 sql_audit
|
common::ParamStore ps_params_; // 文本 ps params 记录,用于填入 sql_audit
|
||||||
common::ObFunction<void(const int)> close_fail_cb_;
|
common::ObFunction<void(const int, int&)> close_fail_cb_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user