[Fix] check data completement task before execute inner sql and kill session when task is cancel

This commit is contained in:
obdev
2022-11-28 06:05:54 +00:00
committed by ob-robot
parent d874825952
commit 038db1917d
39 changed files with 1472 additions and 200 deletions

View File

@ -21,9 +21,11 @@
#include "lib/oblog/ob_log_module.h"
#include "lib/container/ob_iarray.h"
#include "storage/tx/ob_multi_data_source.h"
#include "sql/plan_cache/ob_plan_cache_util.h"
using namespace oceanbase::common;
using namespace oceanbase::share::schema;
using namespace oceanbase::sql;
namespace oceanbase
{
@ -247,12 +249,70 @@ int ObInnerSqlRpcP::process_lock_tablet(sqlclient::ObISQLConnection *conn,
return ret;
}
int ObInnerSqlRpcP::create_tmp_session(
uint64_t tenant_id,
sql::ObSQLSessionInfo *&tmp_session,
sql::ObFreeSessionCtx &free_session_ctx,
const bool is_oracle_mode)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL != tmp_session)) {
ret = OB_INIT_TWICE;
LOG_WARN("tmp_session is not null.", K(ret));
} else if (NULL == GCTX.session_mgr_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session manager is NULL", K(ret));
} else {
uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
uint64_t proxy_sid = 0;
if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
LOG_WARN("alloc session id failed", K(ret));
} else if (OB_FAIL(GCTX.session_mgr_->create_session(tenant_id, sid, proxy_sid,
ObTimeUtility::current_time(),
tmp_session))) {
GCTX.session_mgr_->mark_sessid_unused(sid);
tmp_session = NULL;
LOG_WARN("create session failed", K(ret), K(sid));
} else {
const bool is_extern_session = true;
if (OB_NOT_NULL(tmp_session)
&& OB_FAIL(observer::ObInnerSQLConnection::init_session_info(
tmp_session,
is_extern_session,
is_oracle_mode,
false))) {
LOG_WARN("fail to init session info", K(ret), KPC(tmp_session));
}
free_session_ctx.sessid_ = sid;
free_session_ctx.proxy_sessid_ = proxy_sid;
}
}
return ret;
}
void ObInnerSqlRpcP::cleanup_tmp_session(
sql::ObSQLSessionInfo *tmp_session,
sql::ObFreeSessionCtx &free_session_ctx)
{
if (NULL != GCTX.session_mgr_ && NULL != tmp_session) {
tmp_session->set_session_sleep();
GCTX.session_mgr_->revert_session(tmp_session);
GCTX.session_mgr_->free_session(free_session_ctx);
tmp_session = NULL;
GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
}
ObActiveSessionGuard::setup_default_ash(); // enforce cleanup for future RPC cases
}
int ObInnerSqlRpcP::process()
{
int ret = OB_SUCCESS;
const ObInnerSQLTransmitArg &transmit_arg = arg_;
ObInnerSQLTransmitResult &transmit_result = result_;
sql::ObFreeSessionCtx free_session_ctx;
sql::ObSQLSessionInfo *tmp_session = NULL; // session got from session_mgr_
if (OB_ISNULL(gctx_.res_inner_conn_pool_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null ptr", K(ret), K(gctx_.sql_proxy_));
@ -286,17 +346,39 @@ int ObInnerSqlRpcP::process()
//only read can across cluster
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("only read can execute across cluster", KR(ret), K(owner_cluster_id), K(transmit_arg));
} else if (OB_FAIL(sql_str.assign_fmt("%.*s", transmit_arg.get_inner_sql().length(),
transmit_arg.get_inner_sql().ptr()))) {
LOG_WARN("assign sql to write_sql failed", K(ret), K(transmit_arg.get_inner_sql()));
} else if (transmit_arg.get_use_external_session()
&& ((ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_READ != transmit_arg.get_operation_type() && ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_WRITE != transmit_arg.get_operation_type())
|| OB_INVALID_ID != transmit_arg.get_conn_id())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only init remote conn with sess in read/write and not in trans", KR(ret), K(transmit_arg));
} else if (OB_ISNULL(pool)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ptr is null", K(ret));
} else if (transmit_arg.get_use_external_session()
&& OB_FAIL(create_tmp_session(transmit_arg.get_tenant_id(), tmp_session, free_session_ctx, transmit_arg.get_is_oracle_mode()))) {
LOG_WARN("fail to create_tmp_session", K(ret), K(transmit_arg));
} else if (OB_FAIL(sql_str.assign_fmt("%.*s", transmit_arg.get_inner_sql().length(),
transmit_arg.get_inner_sql().ptr()))) {
LOG_WARN("assign sql to write_sql failed", K(ret), K(transmit_arg.get_inner_sql()));
} else if (OB_FAIL(pool->acquire(transmit_arg.get_conn_id(), transmit_arg.get_is_oracle_mode(),
ObInnerSQLTransmitArg::OPERATION_TYPE_ROLLBACK == transmit_arg.get_operation_type(),
conn))) {
conn, tmp_session))) {
cleanup_tmp_session(tmp_session, free_session_ctx);
LOG_WARN("failed to acquire inner connection", K(ret), K(transmit_arg));
}
/* init session info */
if (OB_NOT_NULL(tmp_session)) {
tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id());
tmp_session->switch_tenant(transmit_arg.get_tenant_id());
ObString sql_stmt(sql_str.ptr());
if (OB_FAIL(tmp_session->set_session_active(
sql_stmt,
0, /* ignore this parameter */
ObTimeUtility::current_time(),
obmysql::COM_QUERY))) {
LOG_WARN("failed to set tmp session active", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(inner_conn = static_cast<observer::ObInnerSQLConnection *>(conn))) {
// do nothing, because conn was set null if need kill_using_conn in aquire
@ -386,6 +468,7 @@ int ObInnerSqlRpcP::process()
transmit_result.set_err_code(ret);
}
}
cleanup_tmp_session(tmp_session, free_session_ctx);
}
return ret;