[to #54988775]alloc result for each prepare in prexecute

This commit is contained in:
LiuYoung00 2024-02-19 08:37:36 +00:00 committed by ob-robot
parent f88d8ee69a
commit 25406e87e1

View File

@ -156,51 +156,51 @@ int ObMPStmtPrexecute::before_process()
int64_t packet_len = (reinterpret_cast<const ObMySQLRawPacket &>
(req_->get_packet())).get_clen();
ObReqTimeGuard req_timeinfo_guard;
SMART_VAR(ObMySQLResultSet, result, *session, THIS_WORKER.get_allocator()) {
result.set_has_more_result(false);
if (sql_len_ > 0 && 0 == stmt_id_) {
int64_t query_timeout = 0;
int64_t tenant_version = 0;
int64_t sys_version = 0;
bool force_sync_resp = false;
bool need_response_error = false;
if (OB_UNLIKELY(!session->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid session", K_(sql), K(ret));
} else if (OB_FAIL(process_kill_client_session(*session))) {
LOG_WARN("client session has been killed", K(ret));
} else if (OB_UNLIKELY(session->is_zombie())) {
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed", K(session->get_session_state()), K_(sql),
K(session->get_sessid()), "proxy_sessid", session->get_proxy_sessid(), K(ret));
} else if (OB_UNLIKELY(packet_len > session->get_max_packet_size())) {
ret = OB_ERR_NET_PACKET_TOO_LARGE;
LOG_WARN("packet too large than allowd for the session", K_(sql), K(ret));
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
LOG_WARN("fail to get query timeout", K_(sql), K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
session->get_effective_tenant_id(), tenant_version))) {
LOG_WARN("fail get tenant broadcast version", K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
OB_SYS_TENANT_ID, sys_version))) {
LOG_WARN("fail get tenant broadcast version", K(ret));
} else if (OB_FAIL(ObMPStmtPrepare::multiple_query_check(*session,
sql_,
force_sync_resp,
need_response_error))) {
LOG_WARN("not support multiple_query", K(ret));
if (sql_len_ > 0 && 0 == stmt_id_) {
int64_t query_timeout = 0;
int64_t tenant_version = 0;
int64_t sys_version = 0;
bool force_sync_resp = false;
bool need_response_error = false;
if (OB_UNLIKELY(!session->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid session", K_(sql), K(ret));
} else if (OB_FAIL(process_kill_client_session(*session))) {
LOG_WARN("client session has been killed", K(ret));
} else if (OB_UNLIKELY(session->is_zombie())) {
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed", K(session->get_session_state()), K_(sql),
K(session->get_sessid()), "proxy_sessid", session->get_proxy_sessid(), K(ret));
} else if (OB_UNLIKELY(packet_len > session->get_max_packet_size())) {
ret = OB_ERR_NET_PACKET_TOO_LARGE;
LOG_WARN("packet too large than allowd for the session", K_(sql), K(ret));
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
LOG_WARN("fail to get query timeout", K_(sql), K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
session->get_effective_tenant_id(), tenant_version))) {
LOG_WARN("fail get tenant broadcast version", K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
OB_SYS_TENANT_ID, sys_version))) {
LOG_WARN("fail get tenant broadcast version", K(ret));
} else if (OB_FAIL(ObMPStmtPrepare::multiple_query_check(*session,
sql_,
force_sync_resp,
need_response_error))) {
LOG_WARN("not support multiple_query", K(ret));
} else {
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
retry_ctrl_.set_tenant_global_schema_version(tenant_version);
retry_ctrl_.set_sys_global_schema_version(sys_version);
if (OB_FAIL(init_process_var(get_ctx(), ObMultiStmtItem(false, 0, ObString()), *session))) {
LOG_WARN("init process var faield.", K(ret));
} else if (OB_FAIL(check_and_refresh_schema(session->get_login_tenant_id(),
session->get_effective_tenant_id()))) {
LOG_WARN("failed to check_and_refresh_schema", K(ret));
} else {
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
retry_ctrl_.set_tenant_global_schema_version(tenant_version);
retry_ctrl_.set_sys_global_schema_version(sys_version);
if (OB_FAIL(init_process_var(get_ctx(), ObMultiStmtItem(false, 0, ObString()), *session))) {
LOG_WARN("init process var faield.", K(ret));
} else if (OB_FAIL(check_and_refresh_schema(session->get_login_tenant_id(),
session->get_effective_tenant_id()))) {
LOG_WARN("failed to check_and_refresh_schema", K(ret));
} else {
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
do {
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
do {
SMART_VAR(ObMySQLResultSet, result, *session, THIS_WORKER.get_allocator()) {
result.set_has_more_result(false);
share::schema::ObSchemaGetterGuard schema_guard;
const uint64_t tenant_id = session->get_effective_tenant_id();
ObVirtualTableIteratorFactory vt_iter_factory(*gctx_.vt_iter_creator_);
@ -246,12 +246,12 @@ int ObMPStmtPrexecute::before_process()
set_exec_start_timestamp(ObTimeUtility::current_time());
int cli_ret = OB_SUCCESS;
get_retry_ctrl().test_and_save_retry_state(gctx_,
get_ctx(),
result,
ret,
cli_ret);
get_ctx(),
result,
ret,
cli_ret);
LOG_WARN("run stmt_query failed, check if need retry",
K(ret), K(cli_ret), K(get_retry_ctrl().need_retry()), K(sql_));
K(ret), K(cli_ret), K(get_retry_ctrl().need_retry()), K(sql_));
ret = cli_ret;
}
if (OB_FAIL(ret)) {
@ -259,107 +259,107 @@ int ObMPStmtPrexecute::before_process()
}
}
}
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
LOG_TRACE("sql retry succeed", K(ret),
"retry_times", retry_ctrl_.get_retry_times());
if (OB_SUCC(ret)) {
stmt_id_ = result.get_statement_id();
first_time_ = true; // need prepare at first time
LOG_DEBUG("execute prexecute success.", K(stmt_id_), K(iteration_count_),
K(get_param_num()), K(result.get_stmt_type()));
}
}
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
LOG_TRACE("sql retry succeed", K(ret),
"retry_times", retry_ctrl_.get_retry_times());
}
}
get_ctx().is_prepare_protocol_ = false; //set to prepare protocol
get_ctx().is_prepare_stage_ = false;
}
} else {
get_ctx().is_pre_execute_ = true;
}
if (OB_SUCC(ret)) {
ObPsSessionInfo *ps_session_info = NULL;
if (OB_FAIL(session->get_ps_session_info(stmt_id_, ps_session_info))) {
LOG_WARN("get_ps_session_info failed", K(ret), K_(stmt_id));
} else {
stmt_type_ = ps_session_info->get_stmt_type();
if (is_arraybinding_has_result_type(stmt_type_) && iteration_count_ > 1) {
set_arraybounding(true);
if (get_ctx().can_reroute_sql_) {
get_ctx().can_reroute_sql_ = false;
LOG_INFO("arraybinding not support reroute sql.");
}
// only init param_store
// array_binding_row_ and array_binding_columns_ will init later
OZ (init_arraybinding_paramstore(*allocator_));
} else {
set_arraybounding(false);
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(request_params(session, pos, ps_stmt_checksum, *allocator_, params_num_))) {
LOG_WARN("prepare-execute protocol get params request failed", K(ret));
} else {
const bool enable_sql_audit =
GCONF.enable_sql_audit && session->get_local_ob_enable_sql_audit();
if (!is_pl_stmt(stmt_type_) && enable_sql_audit) {
OZ (store_params_value_to_str(*allocator_, *session));
}
PS_DEFENSE_CHECK(4) // exec_mode
{
ObMySQLUtil::get_uint4(pos, exec_mode_);
//
// is_commit_on_success_ is not use yet
// other exec_mode set use ==
is_commit_on_success_ = exec_mode_ & OB_OCI_COMMIT_ON_SUCCESS;
exec_mode_ = exec_mode_ & (0xffffffff - OB_OCI_COMMIT_ON_SUCCESS);
if (OB_OCI_BATCH_ERRORS == exec_mode_ && !is_pl_stmt(stmt_type_)) {
set_save_exception(true);
}
}
if (OB_SUCC(ret)) {
stmt_id_ = result.get_statement_id();
first_time_ = true; // need prepare at first time
LOG_DEBUG("execute prexecute success.", K(stmt_id_), K(iteration_count_),
K(get_param_num()), K(result.get_stmt_type()));
}
get_ctx().is_prepare_protocol_ = false; //set to prepare protocol
get_ctx().is_prepare_stage_ = false;
}
} else {
get_ctx().is_pre_execute_ = true;
}
if (OB_SUCC(ret)) {
ObPsSessionInfo *ps_session_info = NULL;
if (OB_FAIL(session->get_ps_session_info(stmt_id_, ps_session_info))) {
LOG_WARN("get_ps_session_info failed", K(ret), K_(stmt_id));
} else {
stmt_type_ = ps_session_info->get_stmt_type();
if (is_arraybinding_has_result_type(stmt_type_) && iteration_count_ > 1) {
set_arraybounding(true);
if (get_ctx().can_reroute_sql_) {
get_ctx().can_reroute_sql_ = false;
LOG_INFO("arraybinding not support reroute sql.");
}
// only init param_store
// array_binding_row_ and array_binding_columns_ will init later
OZ (init_arraybinding_paramstore(*allocator_));
} else {
set_arraybounding(false);
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(request_params(session, pos, ps_stmt_checksum, *allocator_, params_num_))) {
LOG_WARN("prepare-execute protocol get params request failed", K(ret));
} else {
const bool enable_sql_audit =
GCONF.enable_sql_audit && session->get_local_ob_enable_sql_audit();
if (!is_pl_stmt(stmt_type_) && enable_sql_audit) {
OZ (store_params_value_to_str(*allocator_, *session));
}
PS_DEFENSE_CHECK(4) // exec_mode
PS_DEFENSE_CHECK(4) // close stmt count
{
ObMySQLUtil::get_uint4(pos, exec_mode_);
//
// is_commit_on_success_ is not use yet
// other exec_mode set use ==
is_commit_on_success_ = exec_mode_ & OB_OCI_COMMIT_ON_SUCCESS;
exec_mode_ = exec_mode_ & (0xffffffff - OB_OCI_COMMIT_ON_SUCCESS);
if (OB_OCI_BATCH_ERRORS == exec_mode_ && !is_pl_stmt(stmt_type_)) {
set_save_exception(true);
ObMySQLUtil::get_uint4(pos, close_stmt_count_);
int tmp_ret = OB_SUCCESS;
if (0 != close_stmt_count_) {
LOG_INFO("close stmt count:", K(close_stmt_count_), K(stmt_id_));
// OCI not support close_stmt_count_ is not 0 yet.
// for (int64_t i = 0; i < close_stmt_count_; i++) {
// int32_t close_stmt_id = -1;
// ObMySQLUtil::get_int4(pos, close_stmt_id);
// if (OB_NOT_NULL(session->get_cursor(close_stmt_id))) {
// if (OB_FAIL(session->close_cursor(close_stmt_id))) {
// tmp_ret = ret;
// LOG_WARN("fail to close cursor", K(ret), K(stmt_id_), K(close_stmt_id), K(session->get_sessid()));
// }
// }
// if (OB_FAIL(session->close_ps_stmt(close_stmt_id))) {
// LOG_WARN("close ps stmt fail in prepare-execute.", K(stmt_id_), K(close_stmt_id));
// }
// if (OB_SUCCESS != tmp_ret) {
// ret = tmp_ret;
// }
// }
}
}
if (OB_SUCC(ret)) {
PS_DEFENSE_CHECK(4) // close stmt count
{
ObMySQLUtil::get_uint4(pos, close_stmt_count_);
int tmp_ret = OB_SUCCESS;
if (0 != close_stmt_count_) {
LOG_INFO("close stmt count:", K(close_stmt_count_), K(stmt_id_));
// OCI not support close_stmt_count_ is not 0 yet.
// for (int64_t i = 0; i < close_stmt_count_; i++) {
// int32_t close_stmt_id = -1;
// ObMySQLUtil::get_int4(pos, close_stmt_id);
// if (OB_NOT_NULL(session->get_cursor(close_stmt_id))) {
// if (OB_FAIL(session->close_cursor(close_stmt_id))) {
// tmp_ret = ret;
// LOG_WARN("fail to close cursor", K(ret), K(stmt_id_), K(close_stmt_id), K(session->get_sessid()));
// }
// }
// if (OB_FAIL(session->close_ps_stmt(close_stmt_id))) {
// LOG_WARN("close ps stmt fail in prepare-execute.", K(stmt_id_), K(close_stmt_id));
// }
// if (OB_SUCCESS != tmp_ret) {
// ret = tmp_ret;
// }
// }
}
}
}
if (OB_SUCC(ret)) {
PS_DEFENSE_CHECK(4) // checksum
{
ObMySQLUtil::get_uint4(pos, ps_stmt_checksum);
if (DEFAULT_ITERATION_COUNT == ps_stmt_checksum
|| (OB_NOT_NULL(ps_session_info)
&& ps_stmt_checksum != ps_session_info->get_ps_stmt_checksum())) {
ret = OB_ERR_PREPARE_STMT_CHECKSUM;
LOG_ERROR("ps stmt checksum fail", K(ret), "session_id", session->get_sessid(),
K(ps_stmt_checksum), K(*ps_session_info));
} else {
PS_DEFENSE_CHECK(4) // extend_flag
{
ObMySQLUtil::get_uint4(pos, extend_flag_);
}
}
if (OB_SUCC(ret)) {
PS_DEFENSE_CHECK(4) // checksum
{
ObMySQLUtil::get_uint4(pos, ps_stmt_checksum);
if (DEFAULT_ITERATION_COUNT == ps_stmt_checksum
|| (OB_NOT_NULL(ps_session_info)
&& ps_stmt_checksum != ps_session_info->get_ps_stmt_checksum())) {
ret = OB_ERR_PREPARE_STMT_CHECKSUM;
LOG_ERROR("ps stmt checksum fail", K(ret), "session_id", session->get_sessid(),
K(ps_stmt_checksum), K(*ps_session_info));
} else {
PS_DEFENSE_CHECK(4) // extend_flag
{
ObMySQLUtil::get_uint4(pos, extend_flag_);
}
}
}
@ -367,15 +367,15 @@ int ObMPStmtPrexecute::before_process()
}
}
}
session->set_last_trace_id(ObCurTraceId::get_trace_id());
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
if (session != NULL && OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
//清空WARNING BUFFER
ObSqlCtx sql_ctx; // sql_ctx do nothing in do_after_process
tmp_ret = do_after_process(*session, sql_ctx, false/*no asyn response*/);
UNUSED(tmp_ret);
}
}
session->set_last_trace_id(ObCurTraceId::get_trace_id());
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
if (session != NULL && OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
//清空WARNING BUFFER
ObSqlCtx sql_ctx; // sql_ctx do nothing in do_after_process
tmp_ret = do_after_process(*session, sql_ctx, false/*no asyn response*/);
UNUSED(tmp_ret);
}
}