directly modify session variable instead of using inner sql

This commit is contained in:
wxhwang
2023-10-31 22:13:07 +00:00
committed by ob-robot
parent 9c0500e048
commit f129f6e950

View File

@ -2429,43 +2429,30 @@ int ObBackupSetEncryptionExecutor::execute(ObExecContext &ctx, ObBackupSetEncryp
{
int ret = OB_SUCCESS;
common::ObCurTraceId::mark_user_request();
common::ObMySQLProxy *sql_proxy = nullptr;
ObSqlString set_mode_sql;
ObSqlString set_passwd_sql;
sqlclient::ObISQLConnection *conn = nullptr;
observer::ObInnerSQLConnectionPool *pool = nullptr;
ObSQLSessionInfo *session_info = ctx.get_my_session();
int64_t affected_rows = 0;
ObSessionVariable encryption_mode;
ObSessionVariable encryption_passwd;
if (OB_ISNULL(session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(session_info));
} else if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_) || OB_ISNULL(sql_proxy->get_pool())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy must not null", K(ret), KP(GCTX.sql_proxy_));
} else if (sqlclient::INNER_POOL != sql_proxy->get_pool()->get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool type must be inner", K(ret), "type", sql_proxy->get_pool()->get_type());
} else if (OB_ISNULL(pool = static_cast<observer::ObInnerSQLConnectionPool*>(sql_proxy->get_pool()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool must not null", K(ret));
} else if (OB_FAIL(set_mode_sql.assign_fmt("set @%s = '%s'",
OB_BACKUP_ENCRYPTION_MODE_SESSION_STR, ObBackupEncryptionMode::to_str(stmt.get_mode())))) {
LOG_WARN("failed to set mode", K(ret));
} else if (OB_FAIL(set_passwd_sql.assign_fmt("set @%s = '%.*s'",
OB_BACKUP_ENCRYPTION_PASSWD_SESSION_STR, stmt.get_passwd().length(), stmt.get_passwd().ptr()))) {
LOG_WARN("failed to set passwd", K(ret));
} else if (OB_FAIL(pool->acquire(session_info, conn))) {
LOG_WARN("failed to get conn", K(ret));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(), set_mode_sql.ptr(),
affected_rows))) {
LOG_WARN("failed to set mode", K(ret), K(set_mode_sql));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(), set_passwd_sql.ptr(),
affected_rows))) {
LOG_WARN("failed to set passwd", K(ret), K(set_passwd_sql));
} else {
LOG_INFO("ObBackupSetEncryptionExecutor::execute", K(stmt), K(ctx),
K(set_mode_sql), K(set_passwd_sql));
encryption_mode.value_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
encryption_mode.value_.set_varchar(ObBackupEncryptionMode::to_str(stmt.get_mode()));
encryption_mode.meta_.set_meta(encryption_mode.value_.meta_);
encryption_passwd.value_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
encryption_passwd.value_.set_varchar(stmt.get_passwd().ptr(), stmt.get_passwd().length());
encryption_passwd.meta_.set_meta(encryption_passwd.value_.meta_);
if (OB_FAIL(session_info->replace_user_variable(OB_BACKUP_ENCRYPTION_MODE_SESSION_STR, encryption_mode))) {
LOG_WARN("failed to set encryption mode", K(ret), K(encryption_mode));
} else if (OB_FAIL(session_info->replace_user_variable(OB_BACKUP_ENCRYPTION_PASSWD_SESSION_STR, encryption_passwd))) {
LOG_WARN("failed to set encryption passwd", K(ret), K(encryption_passwd));
} else {
LOG_INFO("ObBackupSetEncryptionExecutor::execute", K(encryption_mode), K(encryption_passwd));
}
}
return ret;
}
@ -2473,35 +2460,23 @@ int ObBackupSetDecryptionExecutor::execute(ObExecContext &ctx, ObBackupSetDecryp
{
int ret = OB_SUCCESS;
common::ObCurTraceId::mark_user_request();
common::ObMySQLProxy *sql_proxy = nullptr;
ObSqlString set_passwd_sql;
sqlclient::ObISQLConnection *conn = nullptr;
observer::ObInnerSQLConnectionPool *pool = nullptr;
ObSQLSessionInfo *session_info = ctx.get_my_session();
int64_t affected_rows = 0;
ObSessionVariable decryption_passwd;
if (OB_ISNULL(session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(session_info));
} else if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_) || OB_ISNULL(sql_proxy->get_pool())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy must not null", K(ret), KP(GCTX.sql_proxy_));
} else if (sqlclient::INNER_POOL != sql_proxy->get_pool()->get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool type must be inner", K(ret), "type", sql_proxy->get_pool()->get_type());
} else if (OB_ISNULL(pool = static_cast<observer::ObInnerSQLConnectionPool*>(sql_proxy->get_pool()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool must not null", K(ret));
} else if (OB_FAIL(set_passwd_sql.assign_fmt("set @%s = '%.*s'", OB_BACKUP_DECRYPTION_PASSWD_ARRAY_SESSION_STR,
stmt.get_passwd_array().length(), stmt.get_passwd_array().ptr()))) {
LOG_WARN("failed to set passwd", K(ret));
} else if (OB_FAIL(pool->acquire(session_info, conn))) {
LOG_WARN("failed to get conn", K(ret));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(), set_passwd_sql.ptr(),
affected_rows))) {
LOG_WARN("failed to set passwd", K(ret), K(set_passwd_sql));
} else {
LOG_INFO("ObBackupSetEncryptionExecutor::execute", K(stmt), K(ctx), K(set_passwd_sql));
decryption_passwd.value_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
decryption_passwd.value_.set_varchar(stmt.get_passwd_array().ptr(), stmt.get_passwd_array().length());
decryption_passwd.meta_.set_meta(decryption_passwd.value_.meta_);
if (OB_FAIL(session_info->replace_user_variable(OB_BACKUP_DECRYPTION_PASSWD_ARRAY_SESSION_STR, decryption_passwd))) {
LOG_WARN("failed to set decryption passwd", K(ret), K(decryption_passwd));
} else {
LOG_INFO("ObBackupSetDecryptionExecutor::execute", K(decryption_passwd));
}
}
return ret;
}
@ -2538,26 +2513,13 @@ int ObAddRestoreSourceExecutor::execute(ObExecContext &ctx, ObAddRestoreSourceSt
{
int ret = OB_SUCCESS;
common::ObCurTraceId::mark_user_request();
common::ObMySQLProxy *sql_proxy = nullptr;
ObSqlString add_restore_source_sql;
sqlclient::ObISQLConnection *conn = nullptr;
observer::ObInnerSQLConnectionPool *pool = nullptr;
ObSQLSessionInfo *session_info = ctx.get_my_session();
int64_t affected_rows = 0;
ObObj value;
ObSessionVariable new_value;
if (OB_ISNULL(session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), KP(session_info));
} else if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_) || OB_ISNULL(sql_proxy->get_pool())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy must not null", K(ret), KP(GCTX.sql_proxy_));
} else if (sqlclient::INNER_POOL != sql_proxy->get_pool()->get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool type must be inner", K(ret), "type", sql_proxy->get_pool()->get_type());
} else if (OB_ISNULL(pool = static_cast<observer::ObInnerSQLConnectionPool*>(sql_proxy->get_pool()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pool must not null", K(ret));
} else if (!session_info->user_variable_exists(OB_RESTORE_SOURCE_NAME_SESSION_STR)) {
LOG_INFO("no restore source specified before");
} else {
@ -2569,20 +2531,18 @@ int ObAddRestoreSourceExecutor::execute(ObExecContext &ctx, ObAddRestoreSourceSt
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(add_restore_source_sql.assign_fmt(
"set @%s = '%.*s'", OB_RESTORE_SOURCE_NAME_SESSION_STR,
stmt.get_restore_source_array().length(),
stmt.get_restore_source_array().ptr()))) {
LOG_WARN("failed to add restore source", K(ret), K(stmt));
} else if (OB_FAIL(pool->acquire(session_info, conn))) {
LOG_WARN("failed to get conn", K(ret));
} else if (OB_FAIL(conn->execute_write(session_info->get_effective_tenant_id(),
add_restore_source_sql.ptr(), affected_rows))) {
LOG_WARN("failed to add restore source", K(ret), K(add_restore_source_sql));
} else {
LOG_INFO("ObAddRestoreSourceExecutor::execute", K(stmt), K(ctx), K(add_restore_source_sql));
new_value.value_.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
new_value.value_.set_varchar(stmt.get_restore_source_array().ptr(), stmt.get_restore_source_array().length());
new_value.meta_.set_meta(new_value.value_.meta_);
if (OB_FAIL(session_info->replace_user_variable(OB_RESTORE_SOURCE_NAME_SESSION_STR, new_value))) {
LOG_WARN("failed to set user variable", K(ret), K(new_value));
} else {
LOG_INFO("ObAddRestoreSourceExecutor::execute", K(stmt), K(new_value));
}
}
return ret;
}