[TABLELOCK] refine struct of ObLockFuncContext, check ObSQLSessionInfo and ObExecContext is not null before use them
This commit is contained in:
parent
c0b2e36c0f
commit
b6b2364a32
@ -42,97 +42,106 @@ namespace transaction
|
||||
namespace tablelock
|
||||
{
|
||||
|
||||
int ObLockFuncContext::init(ObSQLSessionInfo &session_info,
|
||||
ObExecContext &ctx,
|
||||
int ObLockFuncContext::init(ObExecContext &ctx,
|
||||
const int64_t timeout_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t query_start_time = session_info.get_query_start_time();
|
||||
// use smaller timeout if we specified the lock timeout us.
|
||||
if (timeout_us > 0
|
||||
&& (ObTimeUtility::current_time() + timeout_us) < THIS_WORKER.get_timeout_ts()) {
|
||||
OX (old_worker_timeout_ts_ = THIS_WORKER.get_timeout_ts());
|
||||
OX (THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + timeout_us));
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ctx.get_physical_plan_ctx())) {
|
||||
old_phy_plan_timeout_ts_ = ctx.get_physical_plan_ctx()->get_timeout_timestamp();
|
||||
ctx.get_physical_plan_ctx()
|
||||
->set_timeout_timestamp(ObTimeUtility::current_time() + timeout_us);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (session_info.get_local_autocommit()) {
|
||||
OX (reset_autocommit_ = true);
|
||||
OZ (session_info.set_autocommit(false));
|
||||
}
|
||||
has_inner_dml_write_ = session_info.has_exec_inner_dml();
|
||||
last_insert_id_ = session_info.get_local_last_insert_id();
|
||||
session_info.set_has_exec_inner_dml(false);
|
||||
ObSQLSessionInfo *session_info = nullptr;
|
||||
|
||||
ObTransID parent_tx_id;
|
||||
parent_tx_id = session_info.get_tx_id();
|
||||
OZ (session_info.begin_autonomous_session(saved_session_));
|
||||
OX (have_saved_session_ = true);
|
||||
OZ (ObSqlTransControl::explicit_start_trans(ctx, false));
|
||||
if (OB_ISNULL(session_info = ctx.get_my_session())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("session_info is null in ObExecContext", K(ret));
|
||||
} else {
|
||||
// use smaller timeout if we specified the lock timeout us.
|
||||
if (timeout_us > 0
|
||||
&& (ObTimeUtility::current_time() + timeout_us) < THIS_WORKER.get_timeout_ts()) {
|
||||
OX (old_worker_timeout_ts_ = THIS_WORKER.get_timeout_ts());
|
||||
OX (THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + timeout_us));
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ctx.get_physical_plan_ctx())) {
|
||||
old_phy_plan_timeout_ts_ = ctx.get_physical_plan_ctx()->get_timeout_timestamp();
|
||||
ctx.get_physical_plan_ctx()
|
||||
->set_timeout_timestamp(ObTimeUtility::current_time() + timeout_us);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
has_autonomous_tx_ = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && parent_tx_id.is_valid()) {
|
||||
(void) register_for_deadlock_(session_info, parent_tx_id);
|
||||
if (session_info->get_local_autocommit()) {
|
||||
OX (reset_autocommit_ = true);
|
||||
OZ (session_info->set_autocommit(false));
|
||||
}
|
||||
has_inner_dml_write_ = session_info->has_exec_inner_dml();
|
||||
last_insert_id_ = session_info->get_local_last_insert_id();
|
||||
session_info->set_has_exec_inner_dml(false);
|
||||
|
||||
ObTransID parent_tx_id;
|
||||
parent_tx_id = session_info->get_tx_id();
|
||||
OZ (session_info->begin_autonomous_session(saved_session_));
|
||||
OX (have_saved_session_ = true);
|
||||
OZ (ObSqlTransControl::explicit_start_trans(ctx, false));
|
||||
if (OB_SUCC(ret)) {
|
||||
has_autonomous_tx_ = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && parent_tx_id.is_valid()) {
|
||||
(void) register_for_deadlock_(*session_info, parent_tx_id);
|
||||
}
|
||||
}
|
||||
OX (my_exec_ctx_ = &ctx);
|
||||
OZ (open_inner_conn_());
|
||||
}
|
||||
OX (session_info_ = &session_info);
|
||||
OX (my_exec_ctx_ = &ctx);
|
||||
OZ (open_inner_conn_());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLockFuncContext::destroy(ObExecContext &ctx,
|
||||
ObSQLSessionInfo &session_info,
|
||||
bool is_rollback)
|
||||
{
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session_info = nullptr;
|
||||
|
||||
if (has_autonomous_tx_) {
|
||||
if (OB_TMP_FAIL(implicit_end_trans_(session_info, ctx, is_rollback))) {
|
||||
LOG_WARN("failed to rollback trans", K(tmp_ret));
|
||||
if (OB_ISNULL(session_info = ctx.get_my_session())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("session_info is null in ObExecContext", K(ret));
|
||||
} else {
|
||||
if (has_autonomous_tx_) {
|
||||
if (OB_TMP_FAIL(implicit_end_trans_(*session_info, ctx, is_rollback))) {
|
||||
LOG_WARN("failed to rollback trans", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
if (OB_TMP_FAIL(close_inner_conn_())) {
|
||||
LOG_WARN("close inner connection failed", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
if (OB_TMP_FAIL(close_inner_conn_())) {
|
||||
LOG_WARN("close inner connection failed", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
if (have_saved_session_) {
|
||||
if (OB_TMP_FAIL(session_info.end_autonomous_session(saved_session_))) {
|
||||
LOG_WARN("failed to switch trans", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
if (have_saved_session_) {
|
||||
if (OB_TMP_FAIL(session_info->end_autonomous_session(saved_session_))) {
|
||||
LOG_WARN("failed to switch trans", K(tmp_ret));
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WHY WE NEED THIS
|
||||
uint64_t cur_last_insert_id = session_info.get_local_last_insert_id();
|
||||
if (cur_last_insert_id != last_insert_id_) {
|
||||
ObObj last_insert_id;
|
||||
last_insert_id.set_uint64(last_insert_id_);
|
||||
tmp_ret = session_info.update_sys_variable(SYS_VAR_LAST_INSERT_ID, last_insert_id);
|
||||
if (OB_SUCCESS == tmp_ret &&
|
||||
OB_TMP_FAIL(session_info.update_sys_variable(SYS_VAR_IDENTITY, last_insert_id))) {
|
||||
LOG_WARN("succ update last_insert_id, but fail to update identity", K(tmp_ret));
|
||||
}
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
}
|
||||
session_info.set_has_exec_inner_dml(has_inner_dml_write_);
|
||||
if (old_worker_timeout_ts_ != 0) {
|
||||
THIS_WORKER.set_timeout_ts(old_worker_timeout_ts_);
|
||||
if (OB_NOT_NULL(ctx.get_physical_plan_ctx())) {
|
||||
ctx.get_physical_plan_ctx()->set_timeout_timestamp(old_phy_plan_timeout_ts_);
|
||||
}
|
||||
}
|
||||
if (reset_autocommit_) {
|
||||
if (OB_TMP_FAIL(session_info.set_autocommit(true))) {
|
||||
// WHY WE NEED THIS
|
||||
uint64_t cur_last_insert_id = session_info->get_local_last_insert_id();
|
||||
if (cur_last_insert_id != last_insert_id_) {
|
||||
ObObj last_insert_id;
|
||||
last_insert_id.set_uint64(last_insert_id_);
|
||||
tmp_ret = session_info->update_sys_variable(SYS_VAR_LAST_INSERT_ID, last_insert_id);
|
||||
if (OB_SUCCESS == tmp_ret &&
|
||||
OB_TMP_FAIL(session_info->update_sys_variable(SYS_VAR_IDENTITY, last_insert_id))) {
|
||||
LOG_WARN("succ update last_insert_id, but fail to update identity", K(tmp_ret));
|
||||
}
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
LOG_ERROR("restore autocommit value failed", K(tmp_ret), K(ret));
|
||||
}
|
||||
session_info->set_has_exec_inner_dml(has_inner_dml_write_);
|
||||
if (old_worker_timeout_ts_ != 0) {
|
||||
THIS_WORKER.set_timeout_ts(old_worker_timeout_ts_);
|
||||
if (OB_NOT_NULL(ctx.get_physical_plan_ctx())) {
|
||||
ctx.get_physical_plan_ctx()->set_timeout_timestamp(old_phy_plan_timeout_ts_);
|
||||
}
|
||||
}
|
||||
if (reset_autocommit_) {
|
||||
if (OB_TMP_FAIL(session_info->set_autocommit(true))) {
|
||||
ret = COVER_SUCC(tmp_ret);
|
||||
LOG_ERROR("restore autocommit value failed", K(tmp_ret), K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -208,18 +217,22 @@ void ObLockFuncContext::register_for_deadlock_(ObSQLSessionInfo &session_info,
|
||||
int ObLockFuncContext::open_inner_conn_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = my_exec_ctx_->get_my_session();
|
||||
ObSQLSessionInfo *session = nullptr;
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
observer::ObInnerSQLConnection *inner_conn = nullptr;
|
||||
|
||||
if (OB_ISNULL(session) || OB_ISNULL(sql_proxy_ = my_exec_ctx_->get_sql_proxy())) {
|
||||
if (OB_ISNULL(my_exec_ctx_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("ObExecContext in ObLockFuncContext is null", K(ret));
|
||||
} else if (OB_ISNULL(session = my_exec_ctx_->get_my_session()) || OB_ISNULL(sql_proxy = my_exec_ctx_->get_sql_proxy())) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("session or sql_proxy is NULL", K(ret), KP(session), KP(sql_proxy_));
|
||||
LOG_WARN("session or sql_proxy in ObExecContext is NULL", K(ret), KP(session), KP(sql_proxy));
|
||||
} else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("inner_conn_ or store_inner_conn_ should be null", K(ret), KP(inner_conn_), KP(store_inner_conn_));
|
||||
} else if (FALSE_IT(store_inner_conn_ = static_cast<observer::ObInnerSQLConnection *>(session->get_inner_conn()))) {
|
||||
} else if (FALSE_IT(session->set_inner_conn(nullptr))) {
|
||||
} else if (OB_FAIL(ObInnerConnectionLockUtil::create_inner_conn(session, sql_proxy_, inner_conn))) {
|
||||
} else if (OB_FAIL(ObInnerConnectionLockUtil::create_inner_conn(session, sql_proxy, inner_conn))) {
|
||||
LOG_WARN("create inner connection failed", K(ret), KPC(session));
|
||||
} else if (OB_ISNULL(inner_conn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -242,26 +255,31 @@ int ObLockFuncContext::open_inner_conn_()
|
||||
int ObLockFuncContext::close_inner_conn_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = my_exec_ctx_->get_my_session();
|
||||
ObSQLSessionInfo *session = nullptr;
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
|
||||
if (OB_ISNULL(sql_proxy_) || OB_ISNULL(inner_conn_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("sql_proxy or inner_conn of session is NULL", K(ret), KP(sql_proxy_), KP(session), KP(inner_conn_));
|
||||
} else {
|
||||
OZ (sql_proxy_->close(inner_conn_, true));
|
||||
}
|
||||
if (OB_ISNULL(session)) {
|
||||
if (OB_ISNULL(my_exec_ctx_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("session is NULL", K(ret), KP(session));
|
||||
} else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) {
|
||||
// 1. if inner_conn_ is not null, means that we have created inner_conn successfully before, so we must have already
|
||||
// set store_inner_conn_ successfully, just restore it to the session.
|
||||
// 2. if inner_conn_ is null, it's uncertain whether store_inner_conn_ has been set before. If store_inner_conn_
|
||||
// is not null, it must have been set. Otherwise, the inner_conn on the session may be null, or it may have existed
|
||||
// with an error code before store_inner_conn_ being set. At this case, we do not set inner_conn on the session.
|
||||
session->set_inner_conn(store_inner_conn_);
|
||||
LOG_WARN("ObExecContext in ObLockFuncContext is null", K(ret));
|
||||
} else {
|
||||
if (OB_ISNULL(sql_proxy = my_exec_ctx_->get_sql_proxy()) || OB_ISNULL(inner_conn_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("sql_proxy or inner_conn of session is NULL", K(ret), KP(sql_proxy), KP(session), KP(inner_conn_));
|
||||
} else {
|
||||
OZ (sql_proxy->close(inner_conn_, true));
|
||||
}
|
||||
if (OB_ISNULL(session = my_exec_ctx_->get_my_session())) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("session is NULL", K(ret), KP(session));
|
||||
} else if (OB_NOT_NULL(inner_conn_) || OB_NOT_NULL(store_inner_conn_)) {
|
||||
// 1. if inner_conn_ is not null, means that we have created inner_conn successfully before, so we must have already
|
||||
// set store_inner_conn_ successfully, just restore it to the session.
|
||||
// 2. if inner_conn_ is null, it's uncertain whether store_inner_conn_ has been set before. If store_inner_conn_
|
||||
// is not null, it must have been set. Otherwise, the inner_conn on the session may be null, or it may have existed
|
||||
// with an error code before store_inner_conn_ being set. At this case, we do not set inner_conn on the session.
|
||||
session->set_inner_conn(store_inner_conn_);
|
||||
}
|
||||
}
|
||||
sql_proxy_ = nullptr;
|
||||
inner_conn_ = nullptr;
|
||||
store_inner_conn_ = nullptr;
|
||||
return ret;
|
||||
@ -480,6 +498,7 @@ int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx,
|
||||
ObTableLockOwnerID lock_owner;
|
||||
ObSqlString delete_sql;
|
||||
int64_t affected_rows = 0;
|
||||
ObSQLSessionInfo *session = nullptr;
|
||||
|
||||
OZ (check_owner_exist_(ctx, client_session_id, client_session_create_ts, owner_exist));
|
||||
if (OB_SUCC(ret) && !owner_exist) {
|
||||
@ -490,7 +509,9 @@ int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx,
|
||||
table_name,
|
||||
client_session_id));
|
||||
OZ (ctx.execute_write(delete_sql, affected_rows));
|
||||
OX (mark_lock_session_(ctx.session_info_, false));
|
||||
OV (OB_NOT_NULL(ctx.my_exec_ctx_), OB_INVALID_ARGUMENT);
|
||||
OV (OB_NOT_NULL(session = ctx.my_exec_ctx_->get_my_session()), OB_INVALID_ARGUMENT);
|
||||
OX (mark_lock_session_(session, false));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -686,11 +707,11 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObLockFuncContext, stack_ctx1) {
|
||||
OZ (stack_ctx1.init(*sess, ctx, timeout_us));
|
||||
OZ (stack_ctx1.init(ctx, timeout_us));
|
||||
OZ (generate_lock_id_(stack_ctx1, lock_name, timeout_us, lock_id));
|
||||
|
||||
is_rollback = (OB_SUCCESS != ret);
|
||||
if (OB_TMP_FAIL(stack_ctx1.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
|
||||
if (OB_TMP_FAIL(stack_ctx1.destroy(ctx, is_rollback))) {
|
||||
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
|
||||
COVER_SUCC(tmp_ret);
|
||||
}
|
||||
@ -698,7 +719,7 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObLockFuncContext, stack_ctx2) {
|
||||
OZ (stack_ctx2.init(*sess, ctx, timeout_us));
|
||||
OZ (stack_ctx2.init(ctx, timeout_us));
|
||||
// only when connect by proxy should check client_session
|
||||
if (sess->is_obproxy_mode()) {
|
||||
OZ (check_client_ssid_(stack_ctx2, client_session_id, client_session_create_ts));
|
||||
@ -706,7 +727,7 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
|
||||
ret = OB_SUCCESS; // there're no same client_session_id records, continue
|
||||
} else {
|
||||
// TODO(yangyifei.yyf): some SQL can not redo, reroute may cause errors, so skip this step temporarily
|
||||
// OZ (check_need_reroute_(stack_ctx1, sess, client_session_id, client_session_create_ts));
|
||||
// OZ (check_need_reroute_(stack_ctx1, client_session_id, client_session_create_ts));
|
||||
}
|
||||
}
|
||||
OZ (update_session_table_(stack_ctx2,
|
||||
@ -718,7 +739,7 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
|
||||
OX (mark_lock_session_(sess, true));
|
||||
|
||||
is_rollback = (OB_SUCCESS != ret);
|
||||
if (OB_TMP_FAIL(stack_ctx2.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
|
||||
if (OB_TMP_FAIL(stack_ctx2.destroy(ctx, is_rollback))) {
|
||||
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
|
||||
COVER_SUCC(tmp_ret);
|
||||
}
|
||||
@ -869,7 +890,6 @@ int ObGetLockExecutor::update_session_table_(ObLockFuncContext &ctx,
|
||||
}
|
||||
|
||||
int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx,
|
||||
sql::ObSQLSessionInfo *session,
|
||||
const uint32_t client_session_id,
|
||||
const uint64_t client_session_create_ts)
|
||||
{
|
||||
@ -878,36 +898,44 @@ int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx,
|
||||
ObAddr lock_session_addr;
|
||||
uint32_t lock_session_id = 0;
|
||||
int32_t sql_port = 0;
|
||||
ObSqlCtx *sql_ctx = ctx.my_exec_ctx_->get_sql_ctx();
|
||||
ObSqlCtx *sql_ctx = nullptr;
|
||||
ObSQLSessionInfo *session = nullptr;
|
||||
|
||||
if (!session->is_lock_session()) {
|
||||
OZ (get_lock_session_(ctx, client_session_id, client_session_create_ts, lock_session_addr, lock_session_id));
|
||||
// no lock_session in this client, continue
|
||||
if (OB_EMPTY_RESULT == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
OV (lock_session_addr == GCTX.self_addr(), OB_ERR_PROXY_REROUTE, lock_session_addr, GCTX.self_addr());
|
||||
// can not reroute in one observer, so just return OB_SUCCESS
|
||||
OV (lock_session_id == session->get_sessid(), OB_SUCCESS, lock_session_id, session->get_sessid());
|
||||
// to avoid this session wasn't marked as lock_session before
|
||||
if (lock_session_addr == GCTX.self_addr() && lock_session_id == session->get_sessid()) {
|
||||
mark_lock_session_(session, true);
|
||||
OV (OB_NOT_NULL(ctx.my_exec_ctx_), OB_INVALID_ARGUMENT);
|
||||
OV (OB_NOT_NULL(sql_ctx = ctx.my_exec_ctx_->get_sql_ctx()) &&
|
||||
OB_NOT_NULL(session = ctx.my_exec_ctx_->get_my_session()),
|
||||
OB_NOT_INIT);
|
||||
OX (
|
||||
if (!session->is_lock_session()) {
|
||||
OZ (get_lock_session_(ctx, client_session_id, client_session_create_ts, lock_session_addr, lock_session_id));
|
||||
// no lock_session in this client, continue
|
||||
if (OB_EMPTY_RESULT == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
// get lock_session successfully, compare with current session
|
||||
} else if (OB_SUCCESS == ret) {
|
||||
OV (lock_session_addr == GCTX.self_addr(), OB_ERR_PROXY_REROUTE, lock_session_addr, GCTX.self_addr());
|
||||
// can not reroute in one observer, so just return OB_SUCCESS
|
||||
OV (lock_session_id == session->get_sessid(), OB_SUCCESS, lock_session_id, session->get_sessid());
|
||||
// to avoid this session wasn't marked as lock_session before
|
||||
if (lock_session_addr == GCTX.self_addr() && lock_session_id == session->get_sessid()) {
|
||||
mark_lock_session_(session, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ERR_PROXY_REROUTE == ret) {
|
||||
if (OB_TMP_FAIL(get_sql_port_(ctx, lock_session_addr, sql_port))) {
|
||||
LOG_WARN("can not get sql_port, reroute to this server this time", K(tmp_ret), K(lock_session_addr), K(GCTX.self_addr()));
|
||||
sql_ctx->get_or_create_reroute_info()->server_ = GCTX.self_addr();
|
||||
sql_ctx->get_reroute_info()->server_.set_port(GCONF.mysql_port);
|
||||
sql_ctx->get_reroute_info()->for_session_reroute_ = true;
|
||||
} else {
|
||||
sql_ctx->get_or_create_reroute_info()->server_ = lock_session_addr;
|
||||
sql_ctx->get_reroute_info()->server_.set_port(sql_port);
|
||||
sql_ctx->get_reroute_info()->for_session_reroute_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ERR_PROXY_REROUTE == ret) {
|
||||
if (OB_TMP_FAIL(get_sql_port_(ctx, lock_session_addr, sql_port))) {
|
||||
LOG_WARN("can not get sql_port, reroute to this server this time", K(tmp_ret), K(lock_session_addr), K(GCTX.self_addr()));
|
||||
sql_ctx->get_or_create_reroute_info()->server_ = GCTX.self_addr();
|
||||
sql_ctx->get_reroute_info()->server_.set_port(GCONF.mysql_port);
|
||||
sql_ctx->get_reroute_info()->for_session_reroute_ = true;
|
||||
} else {
|
||||
sql_ctx->get_or_create_reroute_info()->server_ = lock_session_addr;
|
||||
sql_ctx->get_reroute_info()->server_.set_port(sql_port);
|
||||
sql_ctx->get_reroute_info()->for_session_reroute_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1014,7 +1042,7 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
|
||||
SMART_VAR(ObLockFuncContext, stack_ctx) {
|
||||
client_session_id = ctx.get_my_session()->get_client_sessid();
|
||||
client_session_create_ts = ctx.get_my_session()->get_client_create_time();
|
||||
OZ (stack_ctx.init(*(ctx.get_my_session()), ctx));
|
||||
OZ (stack_ctx.init(ctx));
|
||||
if (OB_SUCC(ret)) {
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
@ -1069,7 +1097,7 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
|
||||
}
|
||||
}
|
||||
is_rollback = (OB_SUCCESS != ret);
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, is_rollback))) {
|
||||
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
|
||||
COVER_SUCC(tmp_ret);
|
||||
}
|
||||
@ -1149,12 +1177,12 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
|
||||
OZ (ObLockFuncContext::valid_execute_context(ctx));
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObLockFuncContext, stack_ctx) {
|
||||
OZ (stack_ctx.init(*(ctx.get_my_session()), ctx));
|
||||
OZ (stack_ctx.init(ctx));
|
||||
if (OB_SUCC(ret)) {
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
ObTxParam tx_param;
|
||||
if (ctx.get_my_session()->is_obproxy_mode()) {
|
||||
if (OB_NOT_NULL(session) && session->is_obproxy_mode()) {
|
||||
OZ (check_client_ssid_(stack_ctx, client_session_id, client_session_create_ts));
|
||||
if (OB_EMPTY_RESULT == ret) {
|
||||
release_cnt = LOCK_NOT_OWN_RELEASE_CNT;
|
||||
@ -1170,7 +1198,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
|
||||
OZ (remove_session_record_(stack_ctx, client_session_id, client_session_create_ts));
|
||||
}
|
||||
is_rollback = (OB_SUCCESS != ret);
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, is_rollback))) {
|
||||
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
|
||||
COVER_SUCC(tmp_ret);
|
||||
}
|
||||
@ -1197,7 +1225,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
|
||||
OZ (ObLockFuncContext::valid_execute_context(ctx));
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObLockFuncContext, stack_ctx) {
|
||||
OZ (stack_ctx.init(*(ctx.get_my_session()), ctx));
|
||||
OZ (stack_ctx.init(ctx));
|
||||
if (OB_SUCC(ret)) {
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
@ -1207,7 +1235,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
|
||||
OZ (remove_session_record_(stack_ctx, client_session_id, 0));
|
||||
}
|
||||
is_rollback = (OB_SUCCESS != ret);
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
|
||||
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, is_rollback))) {
|
||||
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
|
||||
COVER_SUCC(tmp_ret);
|
||||
}
|
||||
|
@ -64,21 +64,17 @@ public:
|
||||
tenant_id_ = 0;
|
||||
database_id_ = OB_INVALID_ID;
|
||||
database_name_.reset();
|
||||
sql_proxy_ = nullptr;
|
||||
inner_conn_ = nullptr;
|
||||
store_inner_conn_ = nullptr;
|
||||
session_info_ = nullptr;
|
||||
my_exec_ctx_ = nullptr;
|
||||
saved_session_.reset();
|
||||
}
|
||||
|
||||
int init(sql::ObSQLSessionInfo &session_info,
|
||||
sql::ObExecContext &ctx,
|
||||
int init(sql::ObExecContext &ctx,
|
||||
const int64_t timeout_us = 0);
|
||||
int destroy(sql::ObExecContext &ctx,
|
||||
sql::ObSQLSessionInfo &session_info,
|
||||
bool is_rollback);
|
||||
bool is_inited() { return session_info_ != NULL; }
|
||||
bool is_inited() { return my_exec_ctx_ != NULL; }
|
||||
|
||||
static int valid_execute_context(sql::ObExecContext &ctx);
|
||||
int execute_write(const ObSqlString &sql, int64_t &affected_rows);
|
||||
@ -105,10 +101,8 @@ private:
|
||||
uint64_t tenant_id_;
|
||||
uint64_t database_id_;
|
||||
ObSqlString database_name_;
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
observer::ObInnerSQLConnection *inner_conn_;
|
||||
observer::ObInnerSQLConnection *store_inner_conn_;
|
||||
sql::ObSQLSessionInfo *session_info_;
|
||||
sql::ObExecContext *my_exec_ctx_; //my exec context
|
||||
sql::ObBasicSessionInfo::TransSavedValue saved_session_;
|
||||
};
|
||||
@ -190,7 +184,6 @@ private:
|
||||
const uint64_t client_session_create_ts,
|
||||
const uint32_t server_session_id);
|
||||
int check_need_reroute_(ObLockFuncContext &ctx,
|
||||
sql::ObSQLSessionInfo *session,
|
||||
const uint32_t client_session_id,
|
||||
const uint64_t client_session_create_ts);
|
||||
int get_lock_session_(ObLockFuncContext &ctx,
|
||||
|
Loading…
x
Reference in New Issue
Block a user