[CP] fix the incorrect logic to mark session id unused

This commit is contained in:
496148326@qq.com
2023-06-25 09:42:36 +00:00
committed by ob-robot
parent e297a501e4
commit 44b6f772e1
2 changed files with 87 additions and 97 deletions

View File

@ -119,12 +119,6 @@ int ObSMConnectionCallback::init(ObSqlSockSession& sess, ObSMConnection& conn)
} else {
LOG_INFO("sm conn init succ", K(conn.sessid_), K(sess.client_addr_));
}
if (OB_SUCCESS != ret && OB_SUCCESS == conn.ret_) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = GCTX.session_mgr_->mark_sessid_unused(conn.sessid_))) {
LOG_ERROR("fail to mark sessid unused", K(tmp_ret), K(conn.sessid_));
}
}
//如果当前function发生错误,应该在当前function中进行mark_sessid_unused
if (OB_SUCCESS == ret && OB_SUCCESS == conn.ret_) {
@ -149,54 +143,55 @@ void ObSMConnectionCallback::destroy(ObSMConnection& conn)
{
int ret = OB_SUCCESS;
bool is_need_clear = false;
sql::ObDisconnectState disconnect_state;
sql::ObDisconnectState disconnect_state = sql::ObDisconnectState::DIS_INIT;
ObCurTraceId::TraceId trace_id;
if (conn.is_sess_alloc_ && !conn.is_sess_free_) {
{
int tmp_ret = OB_SUCCESS;
sql::ObSQLSessionInfo *sess_info = NULL;
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, conn.sessid_);
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = guard.get_session(sess_info)))) {
LOG_WARN_RET(tmp_ret, "fail to get session", K(tmp_ret), K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_);
} else if (OB_ISNULL(sess_info)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "session info is NULL", K(tmp_ret), K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_);
} else {
disconnect_state = sess_info->get_disconnect_state();
trace_id = sess_info->get_current_trace_id();
if (conn.is_sess_alloc_) {
if (!conn.is_sess_free_) {
{
int tmp_ret = OB_SUCCESS;
sql::ObSQLSessionInfo *sess_info = NULL;
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, conn.sessid_);
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = guard.get_session(sess_info)))) {
LOG_WARN_RET(tmp_ret, "fail to get session", K(tmp_ret), K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_);
} else if (OB_ISNULL(sess_info)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "session info is NULL", K(tmp_ret), K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_);
} else {
disconnect_state = sess_info->get_disconnect_state();
trace_id = sess_info->get_current_trace_id();
}
}
}
sql::ObFreeSessionCtx ctx;
ctx.tenant_id_ = conn.tenant_id_;
ctx.sessid_ = conn.sessid_;
ctx.proxy_sessid_ = conn.proxy_sessid_;
ctx.has_inc_active_num_ = conn.has_inc_active_num_;
sql::ObFreeSessionCtx ctx;
ctx.tenant_id_ = conn.tenant_id_;
ctx.sessid_ = conn.sessid_;
ctx.proxy_sessid_ = conn.proxy_sessid_;
ctx.has_inc_active_num_ = conn.has_inc_active_num_;
//free session in task
ObSrvTask *task = OB_NEW(ObDisconnectTask,
ObModIds::OB_RPC,
ctx);
if (OB_UNLIKELY(NULL == task)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_UNLIKELY(NULL == conn.tenant_)) {
ret = OB_TENANT_NOT_EXIST;
} else if (OB_FAIL(conn.tenant_->recv_request(*task))) {
LOG_WARN("push disconnect task fail", K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_, K(ret));
ob_delete(task);
}
// free session locally
if (OB_FAIL(ret)) {
ObMPDisconnect disconnect_processor(ctx);
rpc::frame::ObReqProcessor *processor = static_cast<rpc::frame::ObReqProcessor *>(&disconnect_processor);
if (OB_FAIL(processor->run())) {
LOG_WARN("free session fail and related session id can not be reused", K(ret), K(ctx));
//free session in task
ObSrvTask *task = OB_NEW(ObDisconnectTask,
ObModIds::OB_RPC,
ctx);
if (OB_UNLIKELY(NULL == task)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_UNLIKELY(NULL == conn.tenant_)) {
ret = OB_TENANT_NOT_EXIST;
} else if (OB_FAIL(conn.tenant_->recv_request(*task))) {
LOG_WARN("push disconnect task fail", K(conn.sessid_),
"proxy_sessid", conn.proxy_sessid_, K(ret));
ob_delete(task);
}
}
// free session locally
if (OB_FAIL(ret)) {
ObMPDisconnect disconnect_processor(ctx);
rpc::frame::ObReqProcessor *processor = static_cast<rpc::frame::ObReqProcessor *>(&disconnect_processor);
if (OB_FAIL(processor->run())) {
LOG_WARN("free session fail and related session id can not be reused", K(ret), K(ctx));
}
}
}
} else {
//if session is not alloc, session id should be mark unused here
if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(&conn, is_need_clear)))) {
LOG_ERROR("fail to jugde need clear", K(ret));
} else if (is_need_clear) {

View File

@ -116,12 +116,6 @@ int ObSMHandler::on_connect(easy_connection_t *c)
LOG_INFO("new mysql sessid created", K(easy_connection_str(c)), K(sessid), K(crt_id_ret));
}
}
if (EASY_ERROR == eret && OB_SUCCESS == crt_id_ret) {
if (OB_SUCCESS != (tmp_ret = gctx_.session_mgr_->mark_sessid_unused(sessid))) {
LOG_ERROR_RET(tmp_ret, "fail to mark sessid unused", K(tmp_ret), K(sessid));
}
}
}
}
@ -179,7 +173,7 @@ int ObSMHandler::on_close(easy_connection_t *c)
int ret = OB_SUCCESS;
bool is_need_clear = false;
ObSMConnection *conn = NULL;
sql::ObDisconnectState disconnect_state;
sql::ObDisconnectState disconnect_state = sql::ObDisconnectState::DIS_INIT;
ObCurTraceId::TraceId trace_id;
if (OB_ISNULL(c) || OB_ISNULL(gctx_.session_mgr_)) {
ret = OB_ERR_UNEXPECTED;
@ -192,52 +186,53 @@ int ObSMHandler::on_close(easy_connection_t *c)
LOG_ERROR("conn is NULL", K(ret));
} else {
//free session
if (conn->is_sess_alloc_ && !conn->is_sess_free_) {
{
int tmp_ret = OB_SUCCESS;
sql::ObSQLSessionInfo *sess_info = NULL;
sql::ObSessionGetterGuard guard(*gctx_.session_mgr_, conn->sessid_);
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = guard.get_session(sess_info)))) {
LOG_WARN_RET(tmp_ret, "fail to get session", K(tmp_ret), K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_);
} else if (OB_ISNULL(sess_info)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "session info is NULL", K(tmp_ret), K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_);
} else {
disconnect_state = sess_info->get_disconnect_state();
trace_id = sess_info->get_current_trace_id();
if (conn->is_sess_alloc_) {
if (!conn->is_sess_free_) {
{
int tmp_ret = OB_SUCCESS;
sql::ObSQLSessionInfo *sess_info = NULL;
sql::ObSessionGetterGuard guard(*gctx_.session_mgr_, conn->sessid_);
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = guard.get_session(sess_info)))) {
LOG_WARN_RET(tmp_ret, "fail to get session", K(tmp_ret), K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_);
} else if (OB_ISNULL(sess_info)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN_RET(tmp_ret, "session info is NULL", K(tmp_ret), K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_);
} else {
disconnect_state = sess_info->get_disconnect_state();
trace_id = sess_info->get_current_trace_id();
}
}
}
sql::ObFreeSessionCtx ctx;
ctx.tenant_id_ = conn->tenant_id_;
ctx.sessid_ = conn->sessid_;
ctx.proxy_sessid_ = conn->proxy_sessid_;
ctx.has_inc_active_num_ = conn->has_inc_active_num_;
sql::ObFreeSessionCtx ctx;
ctx.tenant_id_ = conn->tenant_id_;
ctx.sessid_ = conn->sessid_;
ctx.proxy_sessid_ = conn->proxy_sessid_;
ctx.has_inc_active_num_ = conn->has_inc_active_num_;
//free session in task
ObSrvTask *task = OB_NEW(ObDisconnectTask,
ObModIds::OB_RPC,
ctx);
if (OB_UNLIKELY(NULL == task)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_UNLIKELY(NULL == conn->tenant_)) {
ret = OB_TENANT_NOT_EXIST;
} else if (OB_FAIL(conn->tenant_->recv_request(*task))) {
LOG_WARN("push disconnect task fail", K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_, K(ret));
ob_delete(task);
}
// free session locally
if (OB_FAIL(ret)) {
ObMPDisconnect disconnect_processor(ctx);
rpc::frame::ObReqProcessor *processor = static_cast<rpc::frame::ObReqProcessor *>(&disconnect_processor);
if (OB_FAIL(processor->run())) {
LOG_WARN("free session fail and related session id can not be reused", K(ret), K(ctx), "sessid", conn->sessid_);
//free session in task
ObSrvTask *task = OB_NEW(ObDisconnectTask,
ObModIds::OB_RPC,
ctx);
if (OB_UNLIKELY(NULL == task)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_UNLIKELY(NULL == conn->tenant_)) {
ret = OB_TENANT_NOT_EXIST;
} else if (OB_FAIL(conn->tenant_->recv_request(*task))) {
LOG_WARN("push disconnect task fail", K(conn->sessid_),
"proxy_sessid", conn->proxy_sessid_, K(ret));
ob_delete(task);
}
// free session locally
if (OB_FAIL(ret)) {
ObMPDisconnect disconnect_processor(ctx);
rpc::frame::ObReqProcessor *processor = static_cast<rpc::frame::ObReqProcessor *>(&disconnect_processor);
if (OB_FAIL(processor->run())) {
LOG_WARN("free session fail and related session id can not be reused", K(ret), K(ctx), "sessid", conn->sessid_);
}
}
}
} else {
//if session is not alloc, session id should be mark unused here
if (OB_UNLIKELY(OB_FAIL(sql::ObSQLSessionMgr::is_need_clear_sessid(conn, is_need_clear)))) {
LOG_ERROR("fail to jugde need clear", K(ret));
} else if (is_need_clear) {