rollback transction when reroute for autocommit 6005 retry

This commit is contained in:
chinaxing 2024-06-19 06:23:26 +00:00 committed by ob-robot
parent 9c3b4bbfdd
commit 0fe3ce7c9e
2 changed files with 36 additions and 13 deletions

View File

@ -534,6 +534,8 @@ GLOBAL_ERRSIM_POINT_DEF(2208, EN_DISABLE_SORTKEY_SEPARATELY, "Used to control wh
GLOBAL_ERRSIM_POINT_DEF(2209, EN_ENABLE_VECTOR_IN, "Used to control whether the capability for in-expr vectorization 2.0 is enabled.");
GLOBAL_ERRSIM_POINT_DEF(2210, EN_SQL_MEMORY_MRG_OPTION, "Control automatic memory management global bound size");
GLOBAL_ERRSIM_POINT_DEF(2211, EN_ENABLE_RANDOM_TSC, "wether to randomize batch_size & skips of table scan's output ");
GLOBAL_ERRSIM_POINT_DEF(2212, EN_LOCK_CONFLICT_RETRY_THEN_REROUTE, "force reroute sql when lock conflict and retry a few times");
// WR && ASH
GLOBAL_ERRSIM_POINT_DEF(2301, EN_CLOSE_ASH, "");
GLOBAL_ERRSIM_POINT_DEF(2302, EN_DISABLE_HASH_BASE_DISTINCT, "");

View File

@ -193,7 +193,6 @@ int ObSql::stmt_prepare(const common::ObString &stmt,
bool is_inner_sql/*true*/)
{
int ret = OB_SUCCESS;
const bool trans_valid = result.get_session().get_tx_id().is_valid();
LinkExecCtxGuard link_guard(result.get_session(), result.get_exec_context());
if (OB_FAIL(sanity_check(context))) {
LOG_WARN("Failed to do sanity check", K(ret));
@ -204,7 +203,7 @@ int ObSql::stmt_prepare(const common::ObString &stmt,
if (OB_FAIL(ret) && OB_SUCCESS == result.get_errcode()) {
result.set_errcode(ret);
}
if (OB_FAIL(ret) && !trans_valid) {
if (OB_FAIL(ret)) {
rollback_implicit_trans_when_fail(result, ret);
}
return ret;
@ -224,7 +223,6 @@ int ObSql::stmt_query(const common::ObString &stmt, ObSqlCtx &context, ObResultS
"execution_id", result.get_session().get_current_execution_id());
#endif
NG_TRACE(parse_begin);
const bool trans_valid = result.get_session().get_tx_id().is_valid();
//1 check inited
if (OB_FAIL(sanity_check(context))) {
LOG_WARN("Failed to do sanity check", K(ret));
@ -253,7 +251,7 @@ int ObSql::stmt_query(const common::ObString &stmt, ObSqlCtx &context, ObResultS
context.is_sensitive_ ? ObString(OB_MASKED_STR) : trunc_stmt.string(),
OB_ID(stmt_len), stmt.length());
if (OB_FAIL(ret) && !trans_valid) {
if (OB_FAIL(ret)) {
rollback_implicit_trans_when_fail(result, ret);
}
return ret;
@ -268,7 +266,6 @@ int ObSql::stmt_execute(const ObPsStmtId stmt_id,
{
int ret = OB_SUCCESS;
LinkExecCtxGuard link_guard(result.get_session(), result.get_exec_context());
const bool trans_valid = result.get_session().get_tx_id().is_valid();
if (OB_FAIL(sanity_check(context))) {
LOG_WARN("failed to do sanity check", K(ret));
} else if (OB_FAIL(init_result_set(context, result))) {
@ -293,7 +290,7 @@ int ObSql::stmt_execute(const ObPsStmtId stmt_id,
result.set_errcode(ret);
}
FLT_SET_TAG(sql_id, context.sql_id_);
if (OB_FAIL(ret) && !trans_valid) {
if (OB_FAIL(ret)) {
rollback_implicit_trans_when_fail(result, ret);
}
return ret;
@ -5721,13 +5718,31 @@ int ObSql::check_need_reroute(ObPlanCacheCtx &pc_ctx, ObSQLSessionInfo &session,
int ret = OB_SUCCESS;
need_reroute = false;
ObDASCtx &das_ctx = pc_ctx.exec_ctx_.get_das_ctx();
if (OB_NOT_NULL(plan)
&& pc_ctx.sql_ctx_.can_reroute_sql_
bool should_reroute = false;
if (OB_NOT_NULL(plan)) {
should_reroute = pc_ctx.sql_ctx_.can_reroute_sql_
&& (OB_PHY_PLAN_REMOTE == plan->get_plan_type()
|| (!das_ctx.is_partition_hit() && !das_ctx.get_table_loc_list().empty()))) {
|| (!das_ctx.is_partition_hit() && !das_ctx.get_table_loc_list().empty()));
// check inject reroute for test
if (!should_reroute) {
const uint32_t sessid = session.get_sessid();
const int reroute_retry_cnt = OB_E(EventTable::EN_LOCK_CONFLICT_RETRY_THEN_REROUTE, sessid) OB_SUCCESS;
if (OB_UNLIKELY(reroute_retry_cnt)) {
int last_query_retry_err = session.get_retry_info().get_last_query_retry_err();
int64_t retry_cnt = session.get_retry_info().get_retry_cnt();
should_reroute = last_query_retry_err == OB_TRY_LOCK_ROW_CONFLICT
&& retry_cnt >= -reroute_retry_cnt;
LOG_INFO("inject force reroute sql",
K(last_query_retry_err),
K(retry_cnt),
K(reroute_retry_cnt),
K(should_reroute));
}
}
}
if (should_reroute) {
// reroute request,
// physical table location is already calculated and stored in task_exec_ctx.table_locations_
bool should_reroute = true;
const DependenyTableStore &dep_tables = plan->get_dependency_table();
for (int64_t i = 0;
should_reroute && i < dep_tables.count();
@ -5966,14 +5981,20 @@ void ObSql::rollback_implicit_trans_when_fail(ObResultSet &result, int &ret)
{
bool ac = false;
result.get_session().get_autocommit(ac);
const transaction::ObTxDesc *tx = result.get_session().get_tx_desc();
transaction::ObTxDesc *tx = result.get_session().get_tx_desc();
if (ac && tx && tx->get_tx_id().is_valid() && !tx->is_explicit()) {
const transaction::ObTransID txid = tx->get_tx_id();
int tmp_ret = OB_SUCCESS;
bool need_disconnect = false;
if (OB_TMP_FAIL(ObSqlTransControl::rollback_trans(&result.get_session(), need_disconnect))) {
if (OB_UNLIKELY(result.get_session().is_txn_free_route_temp())) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("implicit trans found on trans free route temp node", K(tmp_ret), K(result.get_session()), K(txid));
tx->dump_and_print_trace();
result.get_exec_context().set_need_disconnect(true);
ret = tmp_ret;
} else if (OB_TMP_FAIL(ObSqlTransControl::rollback_trans(&result.get_session(), need_disconnect))) {
LOG_WARN("rollback transaction fail, will disconnect", K(tmp_ret), K(result.get_session()), K(txid));
result.get_exec_context().get_need_disconnect_for_update() = true;
result.get_exec_context().set_need_disconnect(true);
ret = tmp_ret;
} else {
LOG_INFO("rollback transaction started during get-plan success", K(ret), K(txid));