fix some bugs about tenant=all

This commit is contained in:
obdev
2023-09-18 03:40:23 +00:00
committed by ob-robot
parent e8890cfb04
commit 0534888042
13 changed files with 204 additions and 121 deletions

View File

@ -73,7 +73,9 @@ int ObFreezeExecutor::execute(ObExecContext &ctx, ObFreezeStmt &stmt)
} else {
if (!stmt.is_major_freeze()) {
const uint64_t local_tenant_id = MTL_ID();
bool freeze_all = (stmt.is_freeze_all_user() || stmt.is_freeze_all_meta());
bool freeze_all = (stmt.is_freeze_all() ||
stmt.is_freeze_all_user() ||
stmt.is_freeze_all_meta());
ObRootMinorFreezeArg arg;
if (OB_FAIL(arg.tenant_ids_.assign(stmt.get_tenant_ids()))) {
LOG_WARN("failed to assign tenant_ids", K(ret));
@ -90,29 +92,41 @@ int ObFreezeExecutor::execute(ObExecContext &ctx, ObFreezeStmt &stmt)
if (OB_ISNULL(GCTX.schema_service_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid GCTX", KR(ret));
} else if (stmt.is_freeze_all_user() == stmt.is_freeze_all_meta()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("freeze_all_user and freeze_all_meta cannot be true together",
KR(ret), "freeze_all_user", stmt.is_freeze_all_user(),
"freeze_all_meta", stmt.is_freeze_all_meta());
} else {
common::ObSArray<uint64_t> tmp_tenant_ids;
if (OB_FAIL(GCTX.schema_service_->get_tenant_ids(tmp_tenant_ids))) {
LOG_WARN("fail to get all tenant ids", KR(ret));
} else {
using FUNC_TYPE = bool (*) (const uint64_t);
FUNC_TYPE func = nullptr;
if (stmt.is_freeze_all_user()) {
func = is_user_tenant;
} else {
func = is_meta_tenant;
// if min_cluster_version < 4.2.1.0,disable all_user/all_meta,
// and make tenant=all effective for all tenants.
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) {
if (stmt.is_freeze_all_user() || stmt.is_freeze_all_meta()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
KR(ret), "freeze_all_user", stmt.is_freeze_all_user(),
"freeze_all_meta", stmt.is_freeze_all_meta());
} else if (stmt.is_freeze_all()) {
if (OB_FAIL(GCTX.schema_service_->get_tenant_ids(arg.tenant_ids_))) {
LOG_WARN("fail to get all tenant ids", KR(ret));
}
}
arg.tenant_ids_.reset();
for (int64_t i = 0; OB_SUCC(ret) && (i < tmp_tenant_ids.count()); ++i) {
uint64_t tmp_tenant_id = tmp_tenant_ids.at(i);
if (func(tmp_tenant_id)) {
if (OB_FAIL(arg.tenant_ids_.push_back(tmp_tenant_id))) {
LOG_WARN("failed to push back tenant_id", KR(ret));
} else {
common::ObSArray<uint64_t> tmp_tenant_ids;
if (OB_FAIL(GCTX.schema_service_->get_tenant_ids(tmp_tenant_ids))) {
LOG_WARN("fail to get all tenant ids", KR(ret));
} else {
using FUNC_TYPE = bool (*) (const uint64_t);
FUNC_TYPE func = nullptr;
// caller guarantees that at most one of
// freeze_all/freeze_all_user/freeze_all_meta is true.
if (stmt.is_freeze_all() || stmt.is_freeze_all_user()) {
func = is_user_tenant;
} else {
func = is_meta_tenant;
}
arg.tenant_ids_.reset();
for (int64_t i = 0; OB_SUCC(ret) && (i < tmp_tenant_ids.count()); ++i) {
uint64_t tmp_tenant_id = tmp_tenant_ids.at(i);
if (func(tmp_tenant_id)) {
if (OB_FAIL(arg.tenant_ids_.push_back(tmp_tenant_id))) {
LOG_WARN("failed to push back tenant_id", KR(ret));
}
}
}
}
@ -156,6 +170,7 @@ int ObFreezeExecutor::execute(ObExecContext &ctx, ObFreezeStmt &stmt)
}
} else {
rootserver::ObMajorFreezeParam param;
param.freeze_all_ = stmt.is_freeze_all();
param.freeze_all_user_ = stmt.is_freeze_all_user();
param.freeze_all_meta_ = stmt.is_freeze_all_meta();
param.transport_ = GCTX.net_frame_->get_req_transport();
@ -1023,6 +1038,7 @@ int ObAdminMergeExecutor::execute(ObExecContext &ctx, ObAdminMergeStmt &stmt)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx);
const obrpc::ObAdminMergeArg &arg = stmt.get_rpc_arg();
obrpc::ObCommonRpcProxy *common_rpc = NULL;
if (OB_ISNULL(task_exec_ctx)) {
ret = OB_NOT_INIT;
@ -1030,9 +1046,14 @@ int ObAdminMergeExecutor::execute(ObExecContext &ctx, ObAdminMergeStmt &stmt)
} else if (OB_ISNULL(common_rpc = task_exec_ctx->get_common_rpc())) {
ret = OB_NOT_INIT;
LOG_WARN("get common rpc proxy failed", K(task_exec_ctx));
} else if (OB_FAIL(common_rpc->admin_merge(
stmt.get_rpc_arg()))) {
LOG_WARN("admin merge rpc failed", K(ret), "rpc_arg", stmt.get_rpc_arg());
} else if ((GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) &&
(arg.affect_all_user_ || arg.affect_all_meta_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
KR(ret), "affect_all_user", arg.affect_all_user_,
"affect_all_meta", arg.affect_all_meta_);
} else if (OB_FAIL(common_rpc->admin_merge(arg))) {
LOG_WARN("admin merge rpc failed", K(ret), "rpc_arg", arg);
}
return ret;
}
@ -1149,7 +1170,21 @@ int ObSetConfigExecutor::execute(ObExecContext &ctx, ObSetConfigStmt &stmt)
int ret = OB_SUCCESS;
ObTaskExecutorCtx *task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx);
obrpc::ObCommonRpcProxy *common_rpc = NULL;
if (OB_ISNULL(task_exec_ctx)) {
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) {
const ObFixedLengthString<common::OB_MAX_TENANT_NAME_LENGTH + 1> all_user("all_user");
const ObFixedLengthString<common::OB_MAX_TENANT_NAME_LENGTH + 1> all_meta("all_meta");
FOREACH_X(item, stmt.get_rpc_arg().items_, OB_SUCCESS == ret) {
if (item->tenant_name_ == all_user || item->tenant_name_ == all_meta) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
KR(ret), "tenant_name", item->tenant_name_);
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(task_exec_ctx)) {
ret = OB_NOT_INIT;
LOG_WARN("get task executor context failed");
} else if (OB_ISNULL(common_rpc = task_exec_ctx->get_common_rpc())) {
@ -1390,6 +1425,7 @@ int ObClearMergeErrorExecutor::execute(ObExecContext &ctx, ObClearMergeErrorStmt
int ret = OB_SUCCESS;
UNUSED(stmt);
ObTaskExecutorCtx *task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx);
const obrpc::ObAdminMergeArg &arg = stmt.get_rpc_arg();
obrpc::ObCommonRpcProxy *common_rpc = NULL;
if (OB_ISNULL(task_exec_ctx)) {
ret = OB_NOT_INIT;
@ -1397,8 +1433,13 @@ int ObClearMergeErrorExecutor::execute(ObExecContext &ctx, ObClearMergeErrorStmt
} else if (OB_ISNULL(common_rpc = task_exec_ctx->get_common_rpc())) {
ret = OB_NOT_INIT;
LOG_WARN("get common rpc proxy failed", K(task_exec_ctx));
} else if (OB_FAIL(common_rpc->admin_clear_merge_error(stmt.get_rpc_arg()))) {
LOG_WARN("clear merge error rpc failed", K(ret), "rpc_arg", stmt.get_rpc_arg());
} else if ((GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_1_0) && (arg.affect_all_user_ || arg.affect_all_meta_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("all_user/all_meta are not supported when min_cluster_version is less than 4.2.1.0",
KR(ret), "affect_all_user", arg.affect_all_user_,
"affect_all_meta", arg.affect_all_meta_);
} else if (OB_FAIL(common_rpc->admin_clear_merge_error(arg))) {
LOG_WARN("clear merge error rpc failed", K(ret), "rpc_arg", arg);
}
return ret;
}