diff --git a/deps/oblib/src/lib/ob_define.h b/deps/oblib/src/lib/ob_define.h index 493c88bbaa..e6d840a20c 100644 --- a/deps/oblib/src/lib/ob_define.h +++ b/deps/oblib/src/lib/ob_define.h @@ -1416,6 +1416,7 @@ OB_INLINE bool is_dblink_type_id(uint64_t type_id) const char* const OB_PRIMARY_INDEX_NAME = "PRIMARY"; const int64_t OB_MAX_CONFIG_URL_LENGTH = 512; +const int64_t OB_MAX_ADMIN_COMMAND_LENGTH = 1000; const int64_t OB_MAX_ARBITRATION_SERVICE_NAME_LENGTH = 256; const int64_t OB_MAX_ARBITRATION_SERVICE_LENGTH = 512; diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index b475cc9920..527b210cc2 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -103,6 +103,7 @@ ob_unittest_observer(test_mds_recover test_mds_recover.cpp) ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp) ob_unittest_observer(test_ls_replica test_ls_replica.cpp) ob_unittest_observer(test_create_clone_tenant_resource_pool test_create_clone_tenant_resource_pool.cpp) +ob_unittest_observer(test_ob_admin_arg test_ob_admin_arg.cpp) ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp) ob_unittest_observer(test_tenant_snapshot_service test_tenant_snapshot_service.cpp) ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_reverse_order.cpp) diff --git a/mittest/simple_server/test_ob_admin_arg.cpp b/mittest/simple_server/test_ob_admin_arg.cpp new file mode 100644 index 0000000000..8a6fed7896 --- /dev/null +++ b/mittest/simple_server/test_ob_admin_arg.cpp @@ -0,0 +1,314 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE + +#include +#include +#include "env/ob_simple_cluster_test_base.h" +#include "lib/ob_errno.h" +#include "rootserver/ob_admin_drtask_util.h" // ObAdminDRTaskUtil + +namespace oceanbase +{ +using namespace unittest; +using namespace rootserver; +namespace share +{ +using ::testing::_; +using ::testing::Invoke; +using ::testing::Return; + +using namespace schema; +using namespace common; + +class TestObAdminArg : public unittest::ObSimpleClusterTestBase +{ +public: + TestObAdminArg() : unittest::ObSimpleClusterTestBase("test_ob_admin_arg") {} +}; + +TEST_F(TestObAdminArg, test_argument) +{ + int ret = OB_SUCCESS; + ObAdminCommandArg arg; + const ObAdminDRTaskType task_type(ObAdminDRTaskType::REMOVE_REPLICA); + + uint64_t tenant_id = OB_INVALID_TENANT_ID; + share::ObLSID ls_id; + ObReplicaType replica_type = REPLICA_TYPE_MAX; + common::ObAddr data_source; + common::ObAddr server_addr; + int64_t orig_paxos_replica_number = 0; + int64_t new_paxos_replica_number = 0; + + common::ObAddr server_to_compare1(ObAddr::VER::IPV4, "100.88.107.212", 2001); + common::ObAddr server_to_compare2(ObAddr::VER::IPV4, "100.88.107.212", 2002); + common::ObAddr server_to_compare3; + ret = server_to_compare3.parse_from_string("[ABCD:EF01:2345:6789:ABCD:EF01:2345:6789]:203"); + ASSERT_EQ(0, ret); + common::ObAddr server_to_compare4; + ret = server_to_compare4.parse_from_string("[ABCD:EF01:2345:6789:ABCD:EF01:2345:6789]:204"); + ASSERT_EQ(0, ret); + + // test command is null + ret = arg.init("", task_type); + ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + arg.reset(); + + // test command is not null but invalid + ret = arg.init("abc", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + arg.reset(); + + // test only tenant_id is provided + ret = arg.init("tenant_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + arg.reset(); + + ret = arg.init(" tenant_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + arg.reset(); + + ret = arg.init("tenant_id=1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + arg.reset(); + + ret = arg.init(",tenant_id=1002,,", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + arg.reset(); + + // test only ls_id is provided + ret = arg.init("ls_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, ls_id.id()); + arg.reset(); + + ret = arg.init(" ls_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, ls_id.id()); + arg.reset(); + + ret = arg.init("ls_id=1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, ls_id.id()); + arg.reset(); + + ret = arg.init(",,ls_id=1002,,", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + arg.reset(); + + ret = arg.init("tenant_id=1002,ls_id=1001", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + ret = arg.init("ls_id=1001,tenant_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + ret = arg.init(" ls_id=1001 ,tenant_id=1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + ret = arg.init(" ls_id=1001 , tenant_id=1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + ret = arg.init(" ls_id = 1001 , tenant_id = 1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + ret = arg.init(" LS_id = 1001 , tenaNT_id = 1002 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + arg.reset(); + + // test server address IPV4 + // server address in format ip:port + ret = arg.init("ls_id=1001,server=100.88.107.212:2002,tenant_id=1002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(0, server_to_compare1 == server_addr); + ASSERT_EQ(1, server_to_compare2 == server_addr); + ASSERT_EQ(0, server_to_compare3 == server_addr); + ASSERT_EQ(0, server_to_compare4 == server_addr); + arg.reset(); + + ret = arg.init("ls_id=1001,tenant_id=1002,server=100.88.107.212:2002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(0, server_to_compare1 == server_addr); + ASSERT_EQ(1, server_to_compare2 == server_addr); + ASSERT_EQ(0, server_to_compare3 == server_addr); + ASSERT_EQ(0, server_to_compare4 == server_addr); + arg.reset(); + + ret = arg.init("ls_id=1001,tenant_id=1002,replica_type=r,server=100.88.107.212:2002", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(0, server_to_compare1 == server_addr); + ASSERT_EQ(1, server_to_compare2 == server_addr); + ASSERT_EQ(0, server_to_compare3 == server_addr); + ASSERT_EQ(0, server_to_compare4 == server_addr); + arg.reset(); + + // test server address IPV6 + // server address in format ip:port + ObAddr server_addr_for_ipv6; + ret = arg.init("ls_id=1001,tenant_id=1002,server=[ABCD:EF01:2345:6789:ABCD:EF01:2345:6789]:203", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr_for_ipv6, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(0, server_to_compare1 == server_addr_for_ipv6); + ASSERT_EQ(0, server_to_compare2 == server_addr_for_ipv6); + ASSERT_EQ(1, server_to_compare3 == server_addr_for_ipv6); + ASSERT_EQ(0, server_to_compare4 == server_addr_for_ipv6); + arg.reset(); + + // test data_source IPV4 + // data_source address in format ip:port + ret = arg.init("ls_id=1001,tenant_id=1002,data_source=100.88.107.212:2001", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(1, server_to_compare1 == data_source); + ASSERT_EQ(0, server_to_compare2 == data_source); + ASSERT_EQ(0, server_to_compare3 == data_source); + ASSERT_EQ(0, server_to_compare4 == data_source); + arg.reset(); + + // test data_source IPV4 + // data_source address in format ip:port:timstamp + ret = arg.init("ls_id=1001,tenant_id=1002,data_source=100.88.107.212:2001:100", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + arg.reset(); + + // test data_source IPV4 + // data_source address in format ip:port:timstamp:flag + ret = arg.init("ls_id=1001,tenant_id=1002,data_source=100.88.107.212:2001:100:0", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + arg.reset(); + + // test whole command + ret = arg.init("tenant_id=1002,ls_id=1001,replica_type=readonly,data_source=100.88.107.212:2001,server=100.88.107.212:2002,orig_paxos_replica_number=2,new_paxos_replica_number=3", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(REPLICA_TYPE_READONLY, replica_type); + ASSERT_EQ(1, server_to_compare1 == data_source); + ASSERT_EQ(0, server_to_compare2 == data_source); + ASSERT_EQ(0, server_to_compare3 == data_source); + ASSERT_EQ(0, server_to_compare4 == data_source); + ASSERT_EQ(0, server_to_compare1 == server_addr); + ASSERT_EQ(1, server_to_compare2 == server_addr); + ASSERT_EQ(0, server_to_compare3 == server_addr); + ASSERT_EQ(0, server_to_compare4 == server_addr); + ASSERT_EQ(2, orig_paxos_replica_number); + ASSERT_EQ(3, new_paxos_replica_number); + arg.reset(); + + ret = arg.init(" tenant_id = 1002 , ls_id = 1001 , replica_type = readonly , data_source = 100.88.107.212:2001 , server = 100.88.107.212:2002 , orig_paxos_replica_number = 2 , new_paxos_replica_number = 3 ", task_type); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg(arg, tenant_id, ls_id, replica_type, data_source, server_addr, orig_paxos_replica_number, new_paxos_replica_number); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(1002, tenant_id); + ASSERT_EQ(1001, ls_id.id()); + ASSERT_EQ(REPLICA_TYPE_READONLY, replica_type); + ASSERT_EQ(1, server_to_compare1 == data_source); + ASSERT_EQ(0, server_to_compare2 == data_source); + ASSERT_EQ(0, server_to_compare3 == data_source); + ASSERT_EQ(0, server_to_compare4 == data_source); + ASSERT_EQ(0, server_to_compare1 == server_addr); + ASSERT_EQ(1, server_to_compare2 == server_addr); + ASSERT_EQ(0, server_to_compare3 == server_addr); + ASSERT_EQ(0, server_to_compare4 == server_addr); + ASSERT_EQ(2, orig_paxos_replica_number); + ASSERT_EQ(3, new_paxos_replica_number); + arg.reset(); + +} + +} // namespace share +} // namespace oceanbase + +int main(int argc, char **argv) +{ + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("INFO"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index d3a78a56b8..5e25deb492 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -73,6 +73,7 @@ #endif #include "sql/plan_cache/ob_ps_cache.h" #include "pl/pl_cache/ob_pl_cache_mgr.h" +#include "rootserver/ob_admin_drtask_util.h" // ObAdminDRTaskUtil #include "rootserver/ob_primary_ls_service.h" // for ObPrimaryLSService #include "rootserver/ob_root_utils.h" #include "sql/session/ob_sql_session_info.h" @@ -188,52 +189,7 @@ int ObRpcLSMigrateReplicaP::process() int ObRpcLSAddReplicaP::process() { - int ret = OB_SUCCESS; - uint64_t tenant_id = arg_.tenant_id_; - ObLSService *ls_service = nullptr; - bool is_exist = false; - ObMigrationOpArg migration_op_arg; - - if (tenant_id != MTL_ID()) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("ObRpcLSAddReplicaP::process tenant not match", K(tenant_id), K(ret)); - } - ObCurTraceId::set(arg_.task_id_); - if (OB_SUCC(ret)) { - SERVER_EVENT_ADD("storage_ha", "schedule_ls_add start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), - "data_src", arg_.data_source_.get_server(), "dest", arg_.dst_.get_server()); - - ls_service = MTL(ObLSService*); - if (OB_ISNULL(ls_service)) { - ret = OB_ERR_UNEXPECTED; - COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret)); - } else if (OB_FAIL(ls_service->check_ls_exist(arg_.ls_id_, is_exist))) { - COMMON_LOG(WARN, "failed to check ls exist", K(ret), K(arg_)); - } else if (is_exist) { - ret = OB_LS_EXIST; - COMMON_LOG(WARN, "can not add ls which local ls is exist", K(ret), K(arg_), K(is_exist)); - } else { - migration_op_arg.cluster_id_ = GCONF.cluster_id; - migration_op_arg.data_src_ = arg_.data_source_; - migration_op_arg.dst_ = arg_.dst_; - migration_op_arg.ls_id_ = arg_.ls_id_; - //TODO(muwei.ym) need check priority in 4.2 RC3 - migration_op_arg.priority_ = ObMigrationOpPriority::PRIO_HIGH; - migration_op_arg.paxos_replica_number_ = arg_.new_paxos_replica_number_; - migration_op_arg.src_ = arg_.data_source_; - migration_op_arg.type_ = ObMigrationOpType::ADD_LS_OP; - - if (OB_FAIL(ls_service->create_ls_for_ha(arg_.task_id_, migration_op_arg))) { - COMMON_LOG(WARN, "failed to create ls for ha", K(ret), K(arg_), K(migration_op_arg)); - } - } - } - - if (OB_FAIL(ret)) { - SERVER_EVENT_ADD("storage_ha", "schedule_ls_add failed", "tenant_id", arg_.tenant_id_, - "ls_id", arg_.ls_id_, "result", ret); - } - return ret; + return observer::ObService::do_add_ls_replica(arg_); } int ObRpcLSTypeTransformP::process() @@ -279,80 +235,12 @@ int ObRpcLSTypeTransformP::process() int ObRpcLSRemovePaxosReplicaP::process() { - int ret = OB_SUCCESS; - uint64_t tenant_id = arg_.tenant_id_; - MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); - ObLSService *ls_service = nullptr; - ObLSHandle ls_handle; - ObLS *ls = nullptr; - - if (tenant_id != MTL_ID()) { - ret = guard.switch_to(tenant_id); - } - ObCurTraceId::set(arg_.task_id_); - if (OB_SUCC(ret)) { - SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), - "dest", arg_.remove_member_.get_server()); - LOG_INFO("start do remove ls paxos member", K(arg_)); - - ls_service = MTL(ObLSService*); - if (OB_ISNULL(ls_service)) { - ret = OB_ERR_UNEXPECTED; - COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret)); - } else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { - LOG_WARN("failed to get ls", K(ret), K(arg_)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be NULL", K(ret), K(arg_)); - } else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_paxos_member(arg_))) { - LOG_WARN("failed to remove paxos member", K(ret), K(arg_)); - } - } - - if (OB_FAIL(ret)) { - SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member failed", "tenant_id", - arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), "result", ret); - } - return ret; + return observer::ObService::do_remove_ls_paxos_replica(arg_); } int ObRpcLSRemoveNonPaxosReplicaP::process() { - int ret = OB_SUCCESS; - uint64_t tenant_id = arg_.tenant_id_; - MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); - ObLSService *ls_service = nullptr; - ObLSHandle ls_handle; - ObLS *ls = nullptr; - - if (tenant_id != MTL_ID()) { - ret = guard.switch_to(tenant_id); - } - ObCurTraceId::set(arg_.task_id_); - if (OB_SUCC(ret)) { - SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member start", "tenant_id", arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), - "dest", arg_.remove_member_.get_server()); - LOG_INFO("start do remove ls learner member", K(arg_)); - - ls_service = MTL(ObLSService*); - if (OB_ISNULL(ls_service)) { - ret = OB_ERR_UNEXPECTED; - COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret)); - } else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { - LOG_WARN("failed to get ls", K(ret), K(arg_)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls should not be NULL", K(ret), K(arg_)); - } else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_learner_member(arg_))) { - LOG_WARN("failed to remove paxos member", K(ret), K(arg_)); - } - } - - if (OB_FAIL(ret)) { - SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member failed", "tenant_id", - arg_.tenant_id_, "ls_id", arg_.ls_id_.id(), "result", ret); - } - return ret; + return observer::ObService::do_remove_ls_nonpaxos_replica(arg_); } int ObRpcLSModifyPaxosReplicaNumberP::process() @@ -443,6 +331,21 @@ int ObRpcLSCheckDRTaskExistP::process() return ret; } +int ObAdminDRTaskP::process() +{ + int ret = OB_SUCCESS; + ObCurTraceId::init(GCONF.self_addr_); + LOG_INFO("start to handle ls replica task triggered by ob_admin", K_(arg)); + if (OB_UNLIKELY(!arg_.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(arg)); + } else if (OB_FAIL(ObAdminDRTaskUtil::handle_obadmin_command(arg_))) { + LOG_WARN("fail to handle ob admin command", KR(ret), K_(arg)); + } + LOG_INFO("finish handle ls replica task triggered by ob_admin", K_(arg)); + return ret; +} + #ifdef OB_BUILD_ARBITRATION int ObRpcAddArbP::process() { diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index dd2bf848f5..269a08390b 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -167,6 +167,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_LS_REMOVE_PAXOS_REPLICA, ObRpcLSRemovePaxosReplica OB_DEFINE_PROCESSOR_S(Srv, OB_LS_REMOVE_NONPAXOS_REPLICA, ObRpcLSRemoveNonPaxosReplicaP); OB_DEFINE_PROCESSOR_S(Srv, OB_LS_MODIFY_PAXOS_REPLICA_NUMBER, ObRpcLSModifyPaxosReplicaNumberP); OB_DEFINE_PROCESSOR_S(Srv, OB_LS_CHECK_DR_TASK_EXIST, ObRpcLSCheckDRTaskExistP); +OB_DEFINE_PROCESSOR_S(Srv, OB_EXEC_DRTASK_OBADMIN_COMMAND, ObAdminDRTaskP); #ifdef OB_BUILD_ARBITRATION OB_DEFINE_PROCESSOR_S(Srv, OB_ADD_ARB, ObRpcAddArbP); OB_DEFINE_PROCESSOR_S(Srv, OB_REMOVE_ARB, ObRpcRemoveArbP); diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 2f9ee341e4..a62a7216a9 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1664,6 +1664,130 @@ int ObService::get_partition_count(obrpc::ObGetPartitionCountResult &result) return ret; } +int ObService::do_add_ls_replica(const obrpc::ObLSAddReplicaArg &arg) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = arg.tenant_id_; + ObLSService *ls_service = nullptr; + bool is_exist = false; + ObMigrationOpArg migration_op_arg; + if (tenant_id != MTL_ID()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ObRpcLSAddReplicaP::process tenant not match", KR(ret), K(tenant_id)); + } + ObCurTraceId::set(arg.task_id_); + if (OB_SUCC(ret)) { + SERVER_EVENT_ADD("storage_ha", "schedule_ls_add start", "tenant_id", arg.tenant_id_, "ls_id", arg.ls_id_.id(), + "data_src", arg.data_source_.get_server(), "dest", arg.dst_.get_server()); + ls_service = MTL(ObLSService*); + if (OB_ISNULL(ls_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("mtl ObLSService should not be null", KR(ret)); + } else if (OB_FAIL(ls_service->check_ls_exist(arg.ls_id_, is_exist))) { + LOG_WARN("failed to check ls exist", KR(ret), K(arg)); + } else if (is_exist) { + ret = OB_LS_EXIST; + LOG_WARN("can not add ls which local ls is exist", KR(ret), K(arg), K(is_exist)); + } else { + migration_op_arg.cluster_id_ = GCONF.cluster_id; + migration_op_arg.data_src_ = arg.data_source_; + migration_op_arg.dst_ = arg.dst_; + migration_op_arg.ls_id_ = arg.ls_id_; + //TODO(muwei.ym) need check priority in 4.2 RC3 + migration_op_arg.priority_ = ObMigrationOpPriority::PRIO_HIGH; + migration_op_arg.paxos_replica_number_ = arg.new_paxos_replica_number_; + migration_op_arg.src_ = arg.data_source_; + migration_op_arg.type_ = ObMigrationOpType::ADD_LS_OP; + if (OB_FAIL(ls_service->create_ls_for_ha(arg.task_id_, migration_op_arg))) { + LOG_WARN("failed to create ls for ha", KR(ret), K(arg), K(migration_op_arg)); + } + } + } + if (OB_FAIL(ret)) { + SERVER_EVENT_ADD("storage_ha", "schedule_ls_add failed", "tenant_id", arg.tenant_id_, + "ls_id", arg.ls_id_, "result", ret); + } + return ret; +} + +int ObService::do_remove_ls_paxos_replica(const obrpc::ObLSDropPaxosReplicaArg &arg) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = arg.tenant_id_; + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + ObLSService *ls_service = nullptr; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + if (tenant_id != MTL_ID()) { + ret = guard.switch_to(tenant_id); + } + ObCurTraceId::set(arg.task_id_); + if (OB_SUCC(ret)) { + SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member start", "tenant_id", arg.tenant_id_, "ls_id", arg.ls_id_.id(), + "dest", arg.remove_member_.get_server()); + LOG_INFO("start do remove ls paxos member", K(arg)); + ls_service = MTL(ObLSService*); + if (OB_ISNULL(ls_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("mtl ObLSService should not be null", KR(ret)); + } else if (OB_FAIL(ls_service->get_ls(arg.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("failed to get ls", KR(ret), K(arg)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls should not be NULL", KR(ret), K(arg)); + } else if (OB_ISNULL(ls->get_ls_remove_member_handler())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls remove handler should not be NULL", KR(ret), K(arg)); + } else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_paxos_member(arg))) { + LOG_WARN("failed to remove paxos member", KR(ret), K(arg)); + } + } + if (OB_FAIL(ret)) { + SERVER_EVENT_ADD("storage_ha", "remove_ls_paxos_member failed", "tenant_id", + arg.tenant_id_, "ls_id", arg.ls_id_.id(), "result", ret); + } + return ret; +} + +int ObService::do_remove_ls_nonpaxos_replica(const obrpc::ObLSDropNonPaxosReplicaArg &arg) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = arg.tenant_id_; + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + ObLSService *ls_service = nullptr; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + if (tenant_id != MTL_ID()) { + ret = guard.switch_to(tenant_id); + } + ObCurTraceId::set(arg.task_id_); + if (OB_SUCC(ret)) { + SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member start", "tenant_id", arg.tenant_id_, "ls_id", arg.ls_id_.id(), + "dest", arg.remove_member_.get_server()); + LOG_INFO("start do remove ls learner member", K(arg)); + ls_service = MTL(ObLSService*); + if (OB_ISNULL(ls_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("mtl ObLSService should not be null", KR(ret)); + } else if (OB_FAIL(ls_service->get_ls(arg.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("failed to get ls", KR(ret), K(arg)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls should not be NULL", KR(ret), K(arg)); + } else if (OB_ISNULL(ls->get_ls_remove_member_handler())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls remove handler should not be NULL", KR(ret), K(arg)); + } else if (OB_FAIL(ls->get_ls_remove_member_handler()->remove_learner_member(arg))) { + LOG_WARN("failed to remove paxos member", KR(ret), K(arg)); + } + } + if (OB_FAIL(ret)) { + SERVER_EVENT_ADD("storage_ha", "remove_ls_learner_member failed", "tenant_id", + arg.tenant_id_, "ls_id", arg.ls_id_.id(), "result", ret); + } + return ret; +} + #ifdef OB_BUILD_TDE_SECURITY int ObService::convert_tenant_max_key_version( const ObIArray > &max_key_version, diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 83b0bc57d5..500db4c7d3 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -197,6 +197,10 @@ public: int get_server_resource_info(const obrpc::ObGetServerResourceInfoArg &arg, obrpc::ObGetServerResourceInfoResult &result); int get_server_resource_info(share::ObServerResourceInfo &resource_info); static int get_build_version(share::ObServerInfoInTable::ObBuildVersion &build_version); + // log stream replica task related + static int do_remove_ls_paxos_replica(const obrpc::ObLSDropPaxosReplicaArg &arg); + static int do_remove_ls_nonpaxos_replica(const obrpc::ObLSDropNonPaxosReplicaArg &arg); + static int do_add_ls_replica(const obrpc::ObLSAddReplicaArg &arg); // ObRpcIsEmptyServerP @RS bootstrap int is_empty_server(const obrpc::ObCheckServerEmptyArg &arg, obrpc::Bool &is_empty); // ObRpcCheckDeploymentModeP diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index 88dbfdc580..77a4362152 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -110,6 +110,7 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObRpcLSRemoveNonPaxosReplicaP, gctx_); RPC_PROCESSOR(ObRpcLSModifyPaxosReplicaNumberP, gctx_); RPC_PROCESSOR(ObRpcLSCheckDRTaskExistP, gctx_); + RPC_PROCESSOR(ObAdminDRTaskP, gctx_); RPC_PROCESSOR(ObRpcCreateTenantUserLSP, gctx_); RPC_PROCESSOR(ObRpcGenUniqueIDP, gctx_); RPC_PROCESSOR(ObRpcStartTransferTaskP, gctx_); diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index ac55dbe13d..851e7deab2 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -24,6 +24,7 @@ ob_set_subtarget(ob_rootserver backup ob_set_subtarget(ob_rootserver common ob_bootstrap.cpp + ob_admin_drtask_util.cpp ob_disaster_recovery_task_table_updater.cpp ob_balance_group_ls_stat_operator.cpp ob_disaster_recovery_info.cpp diff --git a/src/rootserver/ob_admin_drtask_util.cpp b/src/rootserver/ob_admin_drtask_util.cpp new file mode 100644 index 0000000000..16c1fd8082 --- /dev/null +++ b/src/rootserver/ob_admin_drtask_util.cpp @@ -0,0 +1,653 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS +#include "ob_admin_drtask_util.h" +#include "logservice/ob_log_service.h" // for ObLogService +#include "share/ob_locality_parser.h" // for ObLocalityParser +#include "storage/tx_storage/ob_ls_service.h" // for ObLSService +#include "storage/ls/ob_ls.h" // for ObLS +#include "observer/ob_server_event_history_table_operator.h" // for SERVER_EVENT_ADD + +namespace oceanbase +{ +namespace rootserver +{ +static const char* obadmin_drtask_ret_comment_strs[] = { + "succeed to send ob_admin command", + "invalid tenant_id or ls_id in command", + "expect leader to execute this ob_admin command", + "fail to send rpc", + "fail to execute ob_admin command", + ""/*default max*/ +}; + +const char* ob_admin_drtask_ret_comment_strs(const rootserver::ObAdminDRTaskRetComment ret_comment) +{ + STATIC_ASSERT(ARRAYSIZEOF(obadmin_drtask_ret_comment_strs) == (int64_t)rootserver::ObAdminDRTaskRetComment::MAX_COMMENT + 1, + "ret_comment string array size mismatch enum ObAdminDRTaskRetComment count"); + const char *str = NULL; + if (ret_comment >= rootserver::ObAdminDRTaskRetComment::SUCCEED_TO_SEND_COMMAND && ret_comment <= rootserver::ObAdminDRTaskRetComment::MAX_COMMENT) { + str = obadmin_drtask_ret_comment_strs[static_cast(ret_comment)]; + } else { + str = obadmin_drtask_ret_comment_strs[static_cast(rootserver::ObAdminDRTaskRetComment::MAX_COMMENT)]; + LOG_WARN_RET(OB_INVALID_ARGUMENT, "invalid ObAdminDRTaskRetComment", K(ret_comment)); + } + return str; +} + +int ObAdminDRTaskUtil::handle_obadmin_command(const ObAdminCommandArg &command_arg) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + FLOG_INFO("begin to handle ob_admin command", K(command_arg)); + uint64_t tenant_id = OB_INVALID_TENANT_ID; + share::ObLSID ls_id; + ObSqlString result_comment("ResCmmt"); + ObAdminDRTaskRetComment ret_comment = FAIL_TO_EXECUTE_COMMAND; + int64_t check_begin_time = ObTimeUtility::current_time(); + + if (OB_UNLIKELY(!command_arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg)); + } else if (command_arg.is_remove_task()) { + if (OB_FAIL(handle_remove_command_(command_arg, tenant_id, ls_id, ret_comment))) { + LOG_WARN("fail to handle remove command", KR(ret), K(command_arg)); + } + } else if (command_arg.is_add_task()) { + if (OB_FAIL(handle_add_command_(command_arg, tenant_id, ls_id, ret_comment))) { + LOG_WARN("fail to handle add command", KR(ret), K(command_arg)); + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid task type", KR(ret), K(command_arg)); + } + + if (OB_SUCCESS != (tmp_ret = try_construct_result_comment_(ret, ret_comment, result_comment))) { + LOG_WARN("fail to construct result comment", K(tmp_ret), KR(ret), K(ret_comment)); + } + SERVER_EVENT_ADD("ob_admin", command_arg.get_type_str(), + "tenant_id", tenant_id, + "ls_id", ls_id.id(), + "arg", command_arg, + "result", result_comment, + "trace_id", ObCurTraceId::get_trace_id_str(), + "comment", command_arg.get_comment()); + + int64_t cost = ObTimeUtility::current_time() - check_begin_time; + FLOG_INFO("finish handle ob_admin command", K(command_arg), K(tenant_id), K(ls_id), + K(result_comment), K(ret_comment), K(cost)); + return ret; +} + +int ObAdminDRTaskUtil::handle_add_command_( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObAdminDRTaskRetComment &ret_comment) +{ + int ret = OB_SUCCESS; + tenant_id = OB_INVALID_TENANT_ID; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + ObLSAddReplicaArg arg; + + if (OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_add_task())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg)); + } else if (OB_FAIL(construct_arg_for_add_command_(command_arg, arg, ret_comment))) { + LOG_WARN("fail to construct arg for add command", KR(ret), K(command_arg), + K(arg), K(ret_comment)); + } else if (OB_FAIL(execute_task_for_add_command_(command_arg, arg, ret_comment))) { + LOG_WARN("fail to execute task for add command", KR(ret), K(command_arg), K(arg), K(ret_comment)); + } else { + tenant_id = arg.tenant_id_; + ls_id = arg.ls_id_; + ret_comment = SUCCEED_TO_SEND_COMMAND; + } + return ret; +} + +int ObAdminDRTaskUtil::construct_arg_for_add_command_( + const ObAdminCommandArg &command_arg, + ObLSAddReplicaArg &arg, + ObAdminDRTaskRetComment &ret_comment) +{ + int ret = OB_SUCCESS; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + uint64_t tenant_id = OB_INVALID_TENANT_ID; + share::ObLSID ls_id; + ObReplicaType replica_type = REPLICA_TYPE_FULL; + common::ObAddr data_source_server; + common::ObAddr target_server; + int64_t orig_paxos_replica_number = 0; + int64_t new_paxos_replica_number = 0; + + if (OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_add_task())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg)); + // STEP 1: parse parameters from ob_admin command directly + } else if (OB_FAIL(parse_params_from_obadmin_command_arg( + command_arg, tenant_id, ls_id, replica_type, data_source_server, + target_server, orig_paxos_replica_number, new_paxos_replica_number))) { + LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg)); + } else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID; + LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id)); + } else if (OB_UNLIKELY(!target_server.is_valid()) + || OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server)); + // STEP 2: construct orig_paxos_replica_number and data_source_server if not specified by ob_admin command + } else if (0 == orig_paxos_replica_number || !data_source_server.is_valid()) { + if (OB_FAIL(construct_default_params_for_add_command_( + tenant_id, + ls_id, + orig_paxos_replica_number, + data_source_server))) { + LOG_WARN("fail to fetch ls info and construct related parameters", KR(ret), K(tenant_id), + K(ls_id), K(orig_paxos_replica_number), K(data_source_server)); + } + } + + if (OB_SUCC(ret)) { + new_paxos_replica_number = 0 == new_paxos_replica_number + ? orig_paxos_replica_number + : new_paxos_replica_number; + ObReplicaMember data_source_member(data_source_server, 0/*timstamp*/); + ObReplicaMember add_member(target_server, ObTimeUtility::current_time(), replica_type); + // STEP 3: construct arg + if (OB_ISNULL(ObCurTraceId::get_trace_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(arg.init( + *ObCurTraceId::get_trace_id()/*task_id*/, + tenant_id, + ls_id, + add_member, + data_source_member, + orig_paxos_replica_number, + new_paxos_replica_number, + false/*is_skip_change_member_list-not used*/))) { + LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(add_member), + K(data_source_member), K(orig_paxos_replica_number), K(new_paxos_replica_number)); + } + } + return ret; +} + +int ObAdminDRTaskUtil::construct_default_params_for_add_command_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + int64_t &orig_paxos_replica_number, + common::ObAddr &data_source_server) +{ + int ret = OB_SUCCESS; + share::ObLSInfo ls_info; + const share::ObLSReplica *leader_replica = nullptr; + + if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_ISNULL(GCTX.lst_operator_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid ls table operator", KR(ret)); + } else if (OB_FAIL(GCTX.lst_operator_->get(GCONF.cluster_id, tenant_id, ls_id, + share::ObLSTable::COMPOSITE_MODE, ls_info))) { + LOG_WARN("fail to get ls info", KR(ret), K(tenant_id), K(ls_id), K(ls_info)); + } else if (OB_FAIL(ls_info.find_leader(leader_replica))) { + LOG_WARN("fail to get ls leader replica", KR(ret), K(ls_info)); + } else if (OB_ISNULL(leader_replica)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid leader replica", KR(ret), K(ls_info)); + } else { + // If [orig_paxos_replica_number] or [data_source_server] not specified in obadmin command, + // need to construct from leader_replica, use leader replica as default + if (0 == orig_paxos_replica_number) { + orig_paxos_replica_number = leader_replica->get_paxos_replica_number(); + } + if (!data_source_server.is_valid()) { + data_source_server = leader_replica->get_server(); + } + } + return ret; +} + +int ObAdminDRTaskUtil::execute_task_for_add_command_( + const ObAdminCommandArg &command_arg, + const ObLSAddReplicaArg &arg, + ObAdminDRTaskRetComment &ret_comment) +{ + int ret = OB_SUCCESS; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + const int64_t add_timeout = GCONF.rpc_timeout * 5; + + if (OB_UNLIKELY(!arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_add_task())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(arg), K(command_arg)); + } else if (GCTX.self_addr() == arg.dst_.get_server()) { + // do not need to send rpc, execute locally + MTL_SWITCH(arg.tenant_id_) { + if (OB_FAIL(observer::ObService::do_add_ls_replica(arg))) { + LOG_WARN("fail to execute add replica rpc locally", KR(ret), K(arg)); + } + } + } else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("svr rpc proxy is nullptr", KR(ret)); + } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(arg.dst_.get_server()).by(arg.tenant_id_).timeout(add_timeout).ls_add_replica(arg))) { + ret_comment = ObAdminDRTaskRetComment::FAILED_TO_SEND_RPC; + LOG_WARN("fail to execute add replica rpc", KR(ret), K(arg), K(add_timeout)); + } + + if (OB_SUCC(ret)) { + // local execute or rpc is send, log task start, task finish will be recorded later + ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_ADD_LS_REPLICA_STR, + "tenant_id", arg.tenant_id_, + "ls_id", arg.ls_id_.id(), + "task_id", ObCurTraceId::get_trace_id_str(), + "destination", arg.dst_, + "comment", command_arg.get_comment()); + } + return ret; +} + +int ObAdminDRTaskUtil::handle_remove_command_( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObAdminDRTaskRetComment &ret_comment) +{ + int ret = OB_SUCCESS; + tenant_id = OB_INVALID_TENANT_ID; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + ObReplicaType replica_type = REPLICA_TYPE_FULL; + common::ObAddr data_source_server; + common::ObAddr target_server; + int64_t orig_paxos_replica_number = 0; + int64_t new_paxos_replica_number = 0; + + if (OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_remove_task())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg)); + // STEP 1: parse parameters from ob_admin command directly + } else if (OB_FAIL(parse_params_from_obadmin_command_arg( + command_arg, tenant_id, ls_id, replica_type, data_source_server, + target_server, orig_paxos_replica_number, new_paxos_replica_number))) { + LOG_WARN("fail to parse parameters provided in ob_admin command", KR(ret), K(command_arg)); + } else if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + ret_comment = ObAdminDRTaskRetComment::TENANT_ID_OR_LS_ID_NOT_VALID; + LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(command_arg), K(tenant_id), K(ls_id)); + } else if (OB_UNLIKELY(!target_server.is_valid()) + || OB_UNLIKELY(REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_READONLY != replica_type)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(replica_type), K(target_server)); + } else { + // STEP 2: construct args and execute + if (REPLICA_TYPE_FULL == replica_type) { + ObLSDropPaxosReplicaArg remove_paxos_arg; + if (OB_FAIL(construct_remove_paxos_task_arg_( + tenant_id, ls_id, target_server, orig_paxos_replica_number, + new_paxos_replica_number, ret_comment, remove_paxos_arg))) { + LOG_WARN("fail to construct remove paxos task arg", KR(ret), K(tenant_id), K(ls_id), + K(target_server), K(orig_paxos_replica_number), K(new_paxos_replica_number), + K(ret_comment), K(remove_paxos_arg)); + } else if (OB_FAIL(execute_remove_paxos_task_(command_arg, remove_paxos_arg))) { + LOG_WARN("fail to execute remove paxos replica task", KR(ret), K(command_arg), K(remove_paxos_arg)); + } else { + ret_comment = SUCCEED_TO_SEND_COMMAND; + } + } else if (REPLICA_TYPE_READONLY == replica_type) { + ObLSDropNonPaxosReplicaArg remove_nonpaxos_arg; + if (OB_FAIL(construct_remove_nonpaxos_task_arg_( + tenant_id, ls_id, target_server, ret_comment, remove_nonpaxos_arg))) { + LOG_WARN("fail to construct remove non-paxos replica task arg", KR(ret), K(tenant_id), + K(ls_id), K(target_server), K(ret_comment), K(remove_nonpaxos_arg)); + } else if (OB_FAIL(execute_remove_nonpaxos_task_(command_arg, remove_nonpaxos_arg))) { + LOG_WARN("fail to execute remove nonpaxos replica task", KR(ret), K(command_arg), K(remove_nonpaxos_arg)); + } else { + ret_comment = SUCCEED_TO_SEND_COMMAND; + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("unexpected replica type", KR(ret), K(replica_type), K(tenant_id), K(ls_id), K(command_arg)); + } + } + return ret; +} + +int ObAdminDRTaskUtil::construct_remove_paxos_task_arg_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &target_server, + int64_t &orig_paxos_replica_number, + int64_t &new_paxos_replica_number, + ObAdminDRTaskRetComment &ret_comment, + ObLSDropPaxosReplicaArg &remove_paxos_arg) +{ + int ret = OB_SUCCESS; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + common::ObMember member; + ObReplicaMember member_to_remove; + palf::PalfStat palf_stat; + + if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id)) + || OB_UNLIKELY(!target_server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server)); + } else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) { + LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_UNLIKELY(!palf_stat.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(palf_stat)); + } else if (OB_UNLIKELY(!palf_stat.paxos_member_list_.contains(target_server))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("replica not found in member_list", KR(ret), K(target_server), K(palf_stat)); + } else if (OB_FAIL(palf_stat.paxos_member_list_.get_member_by_addr(target_server, member))) { + LOG_WARN("fail to get member from paxos_member_list", KR(ret), K(palf_stat), K(target_server)); + } else { + member_to_remove = ObReplicaMember(member); + if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_FULL))) { + LOG_WARN("fail to set replica type for member to remove", KR(ret)); + } else { + // If [orig_paxos_replica_number] not specified in obadmin command, + // use leader replica's info as default + orig_paxos_replica_number = 0 == orig_paxos_replica_number + ? palf_stat.paxos_replica_num_ + : orig_paxos_replica_number; + new_paxos_replica_number = 0 == new_paxos_replica_number + ? orig_paxos_replica_number + : new_paxos_replica_number; + } + } + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(ObCurTraceId::get_trace_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(remove_paxos_arg.init( + *ObCurTraceId::get_trace_id()/*task_id*/, tenant_id, ls_id, + member_to_remove, orig_paxos_replica_number, new_paxos_replica_number))) { + LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove), + K(orig_paxos_replica_number), K(new_paxos_replica_number)); + } + return ret; +} + +int ObAdminDRTaskUtil::construct_remove_nonpaxos_task_arg_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &target_server, + ObAdminDRTaskRetComment &ret_comment, + ObLSDropNonPaxosReplicaArg &remove_nonpaxos_arg) +{ + int ret = OB_SUCCESS; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + common::ObMember member; + ObReplicaMember member_to_remove; + palf::PalfStat palf_stat; + + if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id)) + || OB_UNLIKELY(!target_server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id), K(target_server)); + } else if (OB_FAIL(get_local_palf_stat_(tenant_id, ls_id, palf_stat, ret_comment))) { + LOG_WARN("fail to get local palf stat", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_UNLIKELY(!palf_stat.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(palf_stat)); + } else if (OB_UNLIKELY(!palf_stat.learner_list_.contains(target_server))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("replica not found in learner_list", KR(ret), K(target_server), K(palf_stat)); + } else if (OB_FAIL(palf_stat.learner_list_.get_learner_by_addr(target_server, member))) { + LOG_WARN("fail to get member from learner_list", KR(ret), K(palf_stat), K(target_server)); + } else { + member_to_remove = ObReplicaMember(member); + if (OB_FAIL(member_to_remove.set_replica_type(REPLICA_TYPE_READONLY))) { + LOG_WARN("fail to set replica type for member to remove", KR(ret)); + } else if (OB_ISNULL(ObCurTraceId::get_trace_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret)); + } else if (OB_FAIL(remove_nonpaxos_arg.init( + *ObCurTraceId::get_trace_id()/*task_id*/, tenant_id, + ls_id, member_to_remove))) { + LOG_WARN("fail to init arg", KR(ret), K(tenant_id), K(ls_id), K(member_to_remove)); + } + } + return ret; +} + +int ObAdminDRTaskUtil::get_local_palf_stat_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + palf::PalfStat &palf_stat, + ObAdminDRTaskRetComment &ret_comment) +{ + int ret = OB_SUCCESS; + ret_comment = FAIL_TO_EXECUTE_COMMAND; + palf_stat.reset(); + + if (OB_UNLIKELY(!ls_id.is_valid_with_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id or ls_id", KR(ret), K(tenant_id), K(ls_id)); + } else { + MTL_SWITCH(tenant_id) { + logservice::ObLogService *log_service = NULL; + palf::PalfHandleGuard palf_handle_guard; + if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("MTL ObLogService is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(log_service->open_palf(ls_id, palf_handle_guard))) { + LOG_WARN("failed to open palf", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(palf_handle_guard.stat(palf_stat))) { + LOG_WARN("get palf_stat failed", KR(ret), K(tenant_id), K(ls_id)); + } else if (LEADER != palf_stat.role_) { + ret = OB_STATE_NOT_MATCH; + ret_comment = ObAdminDRTaskRetComment::SERVER_TO_EXECUTE_COMMAND_NOT_LEADER; + LOG_WARN("invalid argument, expect self address is leader replica", KR(ret), + K(tenant_id), K(ls_id), K(palf_stat)); + } + } + } + return ret; +} + +int ObAdminDRTaskUtil::execute_remove_paxos_task_( + const ObAdminCommandArg &command_arg, + const ObLSDropPaxosReplicaArg &remove_paxos_arg) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_remove_task()) + || OB_UNLIKELY(!remove_paxos_arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_paxos_arg)); + } else { + // do not need to send rpc, just execute locally + LOG_INFO("start to remove member from member_list", K(remove_paxos_arg)); + MTL_SWITCH(remove_paxos_arg.tenant_id_) { + if (OB_FAIL(observer::ObService::do_remove_ls_paxos_replica(remove_paxos_arg))) { + LOG_WARN("fail to execute remove paxos replica rpc locally", KR(ret), K(remove_paxos_arg)); + } + } + } + if (OB_SUCC(ret)) { + // rpc is send, log task start, task finish will be recorded later + ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR, + "tenant_id", remove_paxos_arg.tenant_id_, + "ls_id", remove_paxos_arg.ls_id_.id(), + "task_id", ObCurTraceId::get_trace_id_str(), + "remove_server", remove_paxos_arg.remove_member_, + "comment", command_arg.get_comment()); + } + return ret; +} + +int ObAdminDRTaskUtil::execute_remove_nonpaxos_task_( + const ObAdminCommandArg &command_arg, + const ObLSDropNonPaxosReplicaArg &remove_non_paxos_arg) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!command_arg.is_valid()) + || OB_UNLIKELY(!command_arg.is_remove_task()) + || OB_UNLIKELY(!remove_non_paxos_arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg), K(remove_non_paxos_arg)); + } else { + // do not need to send rpc, just execute locally + LOG_INFO("start to remove learner from learner_list", K(remove_non_paxos_arg)); + MTL_SWITCH(remove_non_paxos_arg.tenant_id_) { + if (OB_FAIL(observer::ObService::do_remove_ls_nonpaxos_replica(remove_non_paxos_arg))) { + LOG_WARN("fail to execute remove non-paxos replica rpc locally", KR(ret), K(remove_non_paxos_arg)); + } + } + } + if (OB_SUCC(ret)) { + // rpc is send, log task start, task finish will be recorded later + ROOTSERVICE_EVENT_ADD("disaster_recovery", drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR, + "tenant_id", remove_non_paxos_arg.tenant_id_, + "ls_id", remove_non_paxos_arg.ls_id_.id(), + "task_id", ObCurTraceId::get_trace_id_str(), + "remove_server", remove_non_paxos_arg.remove_member_, + "comment", command_arg.get_comment()); + } + return ret; +} + +int ObAdminDRTaskUtil::parse_params_from_obadmin_command_arg( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObReplicaType &replica_type, + common::ObAddr &data_source_server, + common::ObAddr &target_server, + int64_t &orig_paxos_replica_number, + int64_t &new_paxos_replica_number) +{ + int ret = OB_SUCCESS; + // reset output params + tenant_id = OB_INVALID_TENANT_ID; + ls_id.reset(); + replica_type = REPLICA_TYPE_FULL; + data_source_server.reset(); + target_server.reset(); + orig_paxos_replica_number = 0; + new_paxos_replica_number = 0; + // construct items to use + ObArenaAllocator allocator("ObAdminDRTask"); + ObString admin_command_before_trim; + ObString admin_command_after_trim; + ObArray command_params_array; + if (OB_UNLIKELY(!command_arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(command_arg)); + } else if (OB_FAIL(ob_write_string(allocator, command_arg.get_admin_command_str(), admin_command_before_trim))) { + LOG_WARN("fail to write string", KR(ret), K(command_arg)); + } else if (FALSE_IT(admin_command_after_trim = admin_command_before_trim.trim())) { + } else if (OB_FAIL(split_on(admin_command_after_trim, ',', command_params_array))) { + LOG_WARN("fail to split string", KR(ret), K(admin_command_after_trim), K(admin_command_before_trim)); + } else { + LOG_INFO("start to parse parameters from command", K(command_arg), K(command_params_array)); + ObSqlString data_source_string("DtStr"); + for (int64_t param_index = 0; + param_index < command_params_array.count() && OB_SUCC(ret); + param_index++) { + ObString param_name_with_value_str = command_params_array.at(param_index); + ObArray param_name_with_value; + ObSqlString param_name("ParamN"); + ObSqlString param_value("ParamV"); + int64_t pos = 0; + if (OB_FAIL(split_on(param_name_with_value_str, '=', param_name_with_value))) { + LOG_WARN("fail to split param name and value", KR(ret), K(param_name_with_value_str)); + } else if (OB_UNLIKELY(2 != param_name_with_value.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(param_name_with_value)); + } else if (OB_FAIL(param_name.assign(param_name_with_value.at(0).trim()))) { + LOG_WARN("fail to construct parameter name", KR(ret), K(param_name_with_value)); + } else if (OB_FAIL(param_value.assign(param_name_with_value.at(1).trim()))) { + LOG_WARN("fail to construct parameter value", KR(ret), K(param_name_with_value)); + } else if (0 == param_name.string().case_compare("tenant_id")) { + int64_t tenant_id_to_set = OB_INVALID_TENANT_ID; + if (OB_FAIL(extract_int(param_value.string(), 0, pos, tenant_id_to_set))) { + LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(tenant_id_to_set)); + } else { + tenant_id = tenant_id_to_set; + } + } else if (0 == param_name.string().case_compare("ls_id")) { + int64_t ls_id_to_set; + if (OB_FAIL(extract_int(param_value.string(), 0, pos, ls_id_to_set))) { + LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(ls_id_to_set)); + } else { + ls_id = share::ObLSID(ls_id_to_set); + } + } else if (0 == param_name.string().case_compare("replica_type")) { + if (OB_FAIL(share::ObLocalityParser::parse_type( + param_value.ptr(), + param_value.length(), + replica_type))) { + LOG_WARN("fail to parse replica type", KR(ret), K(param_name_with_value), K(replica_type)); + } + } else if (0 == param_name.string().case_compare("orig_paxos_replica_number")) { + if (OB_FAIL(extract_int(param_value.string(), 0, pos, orig_paxos_replica_number))) { + LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(orig_paxos_replica_number)); + } + } else if (0 == param_name.string().case_compare("new_paxos_replica_number")) { + if (OB_FAIL(extract_int(param_value.string(), 0, pos, new_paxos_replica_number))) { + LOG_WARN("fail to extract int from string", KR(ret), K(param_name_with_value), K(new_paxos_replica_number)); + } + } else if (0 == param_name.string().case_compare("server")) { + if (OB_FAIL(target_server.parse_from_string(param_value.string()))) { + LOG_WARN("fail to construct target server from string", KR(ret), K(param_value)); + } + } else if (0 == param_name.string().case_compare("data_source")) { + if (OB_FAIL(data_source_server.parse_from_string(param_value.string()))) { + LOG_WARN("fail to construct data source server from string", KR(ret), K(param_value)); + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(param_name_with_value_str), K(param_name_with_value)); + } + } + + if (OB_SUCC(ret)) { + // if [server] not specified, use local as default + target_server = target_server.is_valid() ? target_server : GCTX.self_addr(); + } + + LOG_INFO("finish parse parameters from command", KR(ret), K(command_arg), K(command_params_array), K(tenant_id), + K(ls_id), K(replica_type), K(data_source_server), K(target_server), K(orig_paxos_replica_number), + K(new_paxos_replica_number)); + } + return ret; +} + +int ObAdminDRTaskUtil::try_construct_result_comment_( + const int &ret_code, + const ObAdminDRTaskRetComment &ret_comment, + ObSqlString &result_comment) +{ + int ret = OB_SUCCESS; + result_comment.reset(); + if (OB_FAIL(result_comment.assign_fmt("ret:%d, %s; ret_comment:%s;", + ret_code, common::ob_error_name(ret_code), + ob_admin_drtask_ret_comment_strs(ret_comment)))) { + LOG_WARN("fail to construct result comment", KR(ret), K(ret_code), K(ret_comment)); + } + return ret; +} +} // end namespace rootserver +} // end namespace oceanbase diff --git a/src/rootserver/ob_admin_drtask_util.h b/src/rootserver/ob_admin_drtask_util.h new file mode 100644 index 0000000000..7df1e5df4f --- /dev/null +++ b/src/rootserver/ob_admin_drtask_util.h @@ -0,0 +1,177 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_ROOTSERVICE_OBADMIN_DRTASK_UTIL_H_ +#define OCEANBASE_ROOTSERVICE_OBADMIN_DRTASK_UTIL_H_ + +#include "logservice/palf/palf_handle_impl.h" // for PalfStat +namespace oceanbase +{ +namespace rootserver +{ + +enum ObAdminDRTaskRetComment +{ + SUCCEED_TO_SEND_COMMAND = 0, + TENANT_ID_OR_LS_ID_NOT_VALID = 1, + SERVER_TO_EXECUTE_COMMAND_NOT_LEADER = 2, + FAILED_TO_SEND_RPC = 3, + FAIL_TO_EXECUTE_COMMAND = 4, + MAX_COMMENT +}; +const char* ob_admin_drtask_ret_comment_strs(const rootserver::ObAdminDRTaskRetComment ret_comment); + +class ObAdminDRTaskUtil +{ +public: + // handle ob_admin request + // params[in] command_arg, arg which contains admin_command + static int handle_obadmin_command(const ObAdminCommandArg &command_arg); + + // parse parameters from ob_admin command + // params[in] command_arg, arg which contains admin_command + // params[out] tenant_id, specified tenant_id + // params[out] ls_id, specified ls_id + // params[out] replica_type, specified replica_type + // params[out] data_source_server, specified data_source + // params[out] target_member, specified target_member + // params[out] leader_addr, specified leader_addr + // params[out] orig_paxos_replica_number, specified original quorum + // params[out] new_paxos_replica_number, specified new quorum + static int parse_params_from_obadmin_command_arg( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObReplicaType &replica_type, + common::ObAddr &data_source_server, + common::ObAddr &target_server, + int64_t &orig_paxos_replica_number, + int64_t &new_paxos_replica_number); +private: + // handle ob_admin remove replica task request + // params[in] command_arg, arg which contains admin_command + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[out] ret_comment, failed reason + static int handle_remove_command_( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObAdminDRTaskRetComment &ret_comment); + + // construct remove paxos replica task arg + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[in] target_server, the replica to remove on which server + // params[out] orig_paxos_replica_number, orig paxos_replica_number + // params[out] new_paxos_replica_number, new paxos_replica_number + // params[out] ret_comment, failed reason + // params[out] remove_paxos_arg, arg for remove-F task + static int construct_remove_paxos_task_arg_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &target_server, + int64_t &orig_paxos_replica_number, + int64_t &new_paxos_replica_number, + ObAdminDRTaskRetComment &ret_comment, + ObLSDropPaxosReplicaArg &remove_paxos_arg); + + // construct remove non-paxos replica task arg + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[in] target_server, the replica to remove on which server + // params[out] ret_comment, failed reason + // params[out] remove_non_paxos_arg, arg for remove-R task + static int construct_remove_nonpaxos_task_arg_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + const common::ObAddr &target_server, + ObAdminDRTaskRetComment &ret_comment, + ObLSDropNonPaxosReplicaArg &remove_nonpaxos_arg); + + // get palf stat locally + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[out] palf_stat, palf informations + // params[out] ret_comment, failed reason + static int get_local_palf_stat_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + palf::PalfStat &palf_stat, + ObAdminDRTaskRetComment &ret_comment); + + // execute remove paxos replica task + // params[in] command_arg, arg which contains admin_command + // params[in] remove_paxos_arg, arg for remove-F task + static int execute_remove_paxos_task_( + const ObAdminCommandArg &command_arg, + const ObLSDropPaxosReplicaArg &remove_paxos_arg); + + // execute remove non-paxos replica task + // params[in] command_arg, arg which contains admin_command + // params[in] remove_non_paxos_arg, arg for remove-R task + static int execute_remove_nonpaxos_task_( + const ObAdminCommandArg &command_arg, + const ObLSDropNonPaxosReplicaArg &remove_non_paxos_arg); + + // handle ob_admin add replica task request + // params[in] command_arg, arg which contains admin_command + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[out] ret_comment, failed reason + static int handle_add_command_( + const ObAdminCommandArg &command_arg, + uint64_t &tenant_id, + share::ObLSID &ls_id, + ObAdminDRTaskRetComment &ret_comment); + + // construct arg for add task + // params[in] command_arg, arg which contains admin_command + // params[in] arg, arg for add replica task + // params[out] ret_comment, failed reason + static int construct_arg_for_add_command_( + const ObAdminCommandArg &command_arg, + ObLSAddReplicaArg &arg, + ObAdminDRTaskRetComment &ret_comment); + + // construct default value for some params + // params[in] tenant_id, specified tenant_id + // params[in] ls_id, specified ls_id + // params[out] orig_paxos_replica_number, orig paxos_replica_number + // params[out] data_source_server, data source + static int construct_default_params_for_add_command_( + const uint64_t &tenant_id, + const share::ObLSID &ls_id, + int64_t &orig_paxos_replica_number, + common::ObAddr &data_source_server); + + // execute remove task + // params[in] command_arg, arg which contains admin_command + // params[in] arg, arg for add replica task + // params[out] ret_comment, failed reason + static int execute_task_for_add_command_( + const ObAdminCommandArg &command_arg, + const ObLSAddReplicaArg &arg, + ObAdminDRTaskRetComment &ret_comment); + + // try construct ret comment to show + // params[in] ret_code, retured ret_code + // params[in] ret_comment, failed reason + // params[out] result_comment, the output message + static int try_construct_result_comment_( + const int &ret_code, + const ObAdminDRTaskRetComment &ret_comment, + ObSqlString &result_comment); +}; +} // end namespace rootserver +} // end namespace oceanbase +#endif /* OCEANBASE_ROOTSERVICE_OBADMIN_DRTASK_UTIL_H_ */ diff --git a/src/rootserver/ob_disaster_recovery_task.h b/src/rootserver/ob_disaster_recovery_task.h index 53ab6d9718..5f688f1911 100644 --- a/src/rootserver/ob_disaster_recovery_task.h +++ b/src/rootserver/ob_disaster_recovery_task.h @@ -54,6 +54,22 @@ namespace drtask const static char * const MIGRATE_REPLICA_DUE_TO_UNIT_NOT_MATCH = "migrate replica due to unit not match"; }; +namespace drtasklog +{ + const static char * const START_MIGRATE_LS_REPLICA_STR = "start_migrate_ls_replica"; + const static char * const FINISH_MIGRATE_LS_REPLICA_STR = "finish_migrate_ls_replica"; + const static char * const START_ADD_LS_REPLICA_STR = "start_add_ls_replica"; + const static char * const FINISH_ADD_LS_REPLICA_STR = "finish_add_ls_replica"; + const static char * const START_TYPE_TRANSFORM_LS_REPLICA_STR = "start_type_transform_ls_replica"; + const static char * const FINISH_TYPE_TRANSFORM_LS_REPLICA_STR = "finish_type_transform_ls_replica"; + const static char * const START_REMOVE_LS_PAXOS_REPLICA_STR = "start_remove_ls_paxos_replica"; + const static char * const FINISH_REMOVE_LS_PAXOS_REPLICA_STR = "finish_remove_ls_paxos_replica"; + const static char * const START_REMOVE_LS_NON_PAXOS_REPLICA_STR = "start_remove_ls_non_paxos_replica"; + const static char * const FINISH_REMOVE_LS_NON_PAXOS_REPLICA_STR = "finish_remove_ls_non_paxos_replica"; + const static char * const START_MODIFY_PAXOS_REPLICA_NUMBER_STR = "start_modify_paxos_replica_number"; + const static char * const FINISH_MODIFY_PAXOS_REPLICA_NUMBER_STR = "finish_modify_paxos_replica_number"; +} + enum class ObDRTaskType : int64_t; enum class ObDRTaskPriority : int64_t; @@ -453,8 +469,8 @@ public: virtual int fill_dml_splicer( share::ObDMLSqlSplicer &dml_splicer) const override; - virtual const char* get_log_start_str() const override { return "start_migrate_ls_replica"; } - virtual const char* get_log_finish_str() const override { return "finish_migrate_ls_replica"; } + virtual const char* get_log_start_str() const override { return drtasklog::START_MIGRATE_LS_REPLICA_STR; } + virtual const char* get_log_finish_str() const override { return drtasklog::FINISH_MIGRATE_LS_REPLICA_STR; } virtual int64_t get_clone_size() const override; virtual int clone( void *input_ptr, @@ -563,8 +579,8 @@ public: virtual int fill_dml_splicer( share::ObDMLSqlSplicer &dml_splicer) const override; - virtual const char* get_log_start_str() const override { return "start_add_ls_replica"; } - virtual const char* get_log_finish_str() const override { return "finish_add_ls_replica"; } + virtual const char* get_log_start_str() const override { return drtasklog::START_ADD_LS_REPLICA_STR; } + virtual const char* get_log_finish_str() const override { return drtasklog::FINISH_ADD_LS_REPLICA_STR; } virtual int64_t get_clone_size() const override; virtual int clone( void *input_ptr, @@ -676,8 +692,8 @@ public: virtual int fill_dml_splicer( share::ObDMLSqlSplicer &dml_splicer) const override; - virtual const char* get_log_start_str() const override { return "start_type_transform_ls_replica"; } - virtual const char* get_log_finish_str() const override { return "finish_type_transform_ls_replica"; } + virtual const char* get_log_start_str() const override { return drtasklog::START_TYPE_TRANSFORM_LS_REPLICA_STR; } + virtual const char* get_log_finish_str() const override { return drtasklog::FINISH_TYPE_TRANSFORM_LS_REPLICA_STR; } virtual int64_t get_clone_size() const override; virtual int clone( void *input_ptr, @@ -799,14 +815,14 @@ public: virtual const char* get_log_start_str() const override { return ObDRTaskType::LS_REMOVE_PAXOS_REPLICA == get_disaster_recovery_task_type() - ? "start_remove_ls_paxos_replica" - : "start_remove_ls_non_paxos_replica"; + ? drtasklog::START_REMOVE_LS_PAXOS_REPLICA_STR + : drtasklog::START_REMOVE_LS_NON_PAXOS_REPLICA_STR; } virtual const char* get_log_finish_str() const override { return ObDRTaskType::LS_REMOVE_PAXOS_REPLICA == get_disaster_recovery_task_type() - ? "finish_remove_ls_paxos_replica" - : "finish_remove_ls_non_paxos_replica"; + ? drtasklog::FINISH_REMOVE_LS_PAXOS_REPLICA_STR + : drtasklog::FINISH_REMOVE_LS_NON_PAXOS_REPLICA_STR; } virtual int64_t get_clone_size() const override; @@ -907,8 +923,8 @@ public: virtual int fill_dml_splicer( share::ObDMLSqlSplicer &dml_splicer) const override; - virtual const char* get_log_start_str() const override { return "start_modify_paxos_replica_number"; } - virtual const char* get_log_finish_str() const override { return "finish_modify_paxos_replica_number"; } + virtual const char* get_log_start_str() const override { return drtasklog::START_MODIFY_PAXOS_REPLICA_NUMBER_STR; } + virtual const char* get_log_finish_str() const override { return drtasklog::FINISH_MODIFY_PAXOS_REPLICA_NUMBER_STR; } virtual int64_t get_clone_size() const override; virtual int clone( void *input_ptr, diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index f926fc6f47..55ef51ca95 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -1583,7 +1583,7 @@ int ObTenantRoleTransitionService::check_tenant_server_online_() LOG_WARN("fail to append sql", KR(ret)); } else { HEAP_VAR(ObMySQLProxy::ReadResult, res) { - ObMySQLResult *result = NULL; + common::sqlclient::ObMySQLResult *result = NULL; if (OB_FAIL(sql_proxy_->read(res, OB_SYS_TENANT_ID, sql.ptr()))) { LOG_WARN("fail to read the tenant's online servers", KR(ret), K(sql), K(tenant_id_)); } else if (NULL == (result = res.get_result())) { diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 332f47dda6..3856b053c3 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -4033,6 +4033,84 @@ int ObDRTaskExistArg::init( return ret; } +static const char* ob_admin_drtask_type_strs[] = { + "ADD REPLICA", + "REMOVE REPLICA" +}; + +static const char* ob_admin_drtask_comment_strs[] = { + "add ls replica trigger by ob_admin", + "remove ls replica trigger by ob_admin" +}; + +OB_SERIALIZE_MEMBER(ObAdminDRTaskType, type_); +const char* ObAdminDRTaskType::get_type_str() const +{ + STATIC_ASSERT(ARRAYSIZEOF(ob_admin_drtask_type_strs) == (int64_t)MAX_TYPE, + "ob_admin_drtask_type_strs string array size mismatch enum AdminDRTaskType count"); + const char *str = NULL; + if (type_ > INVALID_TYPE && type_ < MAX_TYPE) { + str = ob_admin_drtask_type_strs[static_cast(type_)]; + } else { + LOG_WARN_RET(OB_ERR_UNEXPECTED, "invalid AdminDRTaskType", K_(type)); + } + return str; +} + +const char* ObAdminDRTaskType::get_comment() const +{ + STATIC_ASSERT(ARRAYSIZEOF(ob_admin_drtask_comment_strs) == (int64_t)MAX_TYPE, + "ob_admin_drtask_comment_strs string array size mismatch enum AdminDRTaskType count"); + const char *str = NULL; + if (type_ > INVALID_TYPE && type_ < MAX_TYPE) { + str = ob_admin_drtask_comment_strs[static_cast(type_)]; + } else { + LOG_WARN_RET(OB_ERR_UNEXPECTED, "invalid AdminDRTaskType", K_(type)); + } + return str; +} + +int64_t ObAdminDRTaskType::to_string(char *buf, const int64_t buf_len) const +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV(K_(type), "type", get_type_str()); + J_OBJ_END(); + return pos; +} + +OB_SERIALIZE_MEMBER(ObAdminCommandArg, admin_command_, task_type_); +int ObAdminCommandArg::assign(const ObAdminCommandArg &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + //pass + } else if (OB_FAIL(admin_command_.assign(other.get_admin_command_str()))) { + LOG_WARN("fail to assign obadmin command string", KR(ret), K(other)); + } else { + task_type_ = other.get_task_type(); + } + return ret; +} + +int ObAdminCommandArg::init(const ObString &admin_command, const ObAdminDRTaskType &task_type) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(admin_command.length() > OB_MAX_ADMIN_COMMAND_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(admin_command)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "admin command, length oversize"); + } else if (OB_UNLIKELY(admin_command.empty()) || OB_UNLIKELY(!task_type.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(admin_command), K(task_type)); + } else if (OB_FAIL(admin_command_.assign(admin_command))) { + LOG_WARN("fali to assign admin command", KR(ret), K(admin_command)); + } else { + task_type_ = task_type; + } + return ret; +} + #ifdef OB_BUILD_ARBITRATION OB_SERIALIZE_MEMBER(ObAddArbArg, tenant_id_, diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 87c7271144..8f40e8b35b 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -4459,6 +4459,63 @@ public: share::ObLSID ls_id_; }; +class ObAdminDRTaskType +{ + OB_UNIS_VERSION(1); +public: + enum AdminDRTaskType + { + INVALID_TYPE = -1, + ADD_REPLICA = 0, + REMOVE_REPLICA = 1, + MAX_TYPE + }; +public: + ObAdminDRTaskType() : type_(INVALID_TYPE) {} + explicit ObAdminDRTaskType(AdminDRTaskType type) : type_(type) {} + ObAdminDRTaskType &operator=(const AdminDRTaskType type) { type_ = type; return *this; } + ObAdminDRTaskType &operator=(const ObAdminDRTaskType &other) { type_ = other.type_; return *this; } + bool operator==(const ObAdminDRTaskType &other) const { return other.type_ == type_; } + bool operator!=(const ObAdminDRTaskType &other) const { return other.type_ != type_; } + void reset() { type_ = INVALID_TYPE; } + int64_t to_string(char *buf, const int64_t buf_len) const; + void assign(const ObAdminDRTaskType &other) { type_ = other.type_; } + bool is_valid() const { return INVALID_TYPE < type_ && MAX_TYPE > type_; } + bool is_add_task() const { return ADD_REPLICA == type_; } + bool is_remove_task() const { return REMOVE_REPLICA == type_; } + const AdminDRTaskType &get_type() const { return type_; } + const char* get_type_str() const; + const char* get_comment() const; +private: + AdminDRTaskType type_; +}; + +struct ObAdminCommandArg +{ +public: + OB_UNIS_VERSION(1); +public: + ObAdminCommandArg() + : admin_command_(), + task_type_(ObAdminDRTaskType::INVALID_TYPE) {} + ~ObAdminCommandArg() {} +public: + int assign(const ObAdminCommandArg &other); + int init(const ObString &admin_command, const ObAdminDRTaskType &task_type); + bool is_valid() const { return !admin_command_.is_empty() && task_type_.is_valid(); } + void reset() { admin_command_.reset(); task_type_.reset(); } + const ObString get_admin_command_str() const { return admin_command_.str(); } + const ObAdminDRTaskType &get_task_type() const { return task_type_; } + const char* get_type_str() const { return task_type_.get_type_str(); } + const char* get_comment() const { return task_type_.get_comment(); } + bool is_remove_task() const { return task_type_.is_remove_task(); } + bool is_add_task() const { return task_type_.is_add_task(); } + TO_STRING_KV(K(admin_command_), K(task_type_)); +private: + common::ObFixedLengthString admin_command_; + ObAdminDRTaskType task_type_; +}; + #ifdef OB_BUILD_ARBITRATION // send to leader to add A-replica for log stream struct ObAddArbArg diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index bdce3d5b05..54d680af53 100755 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -80,6 +80,7 @@ public: RPC_S(PR5 ls_remove_nonpaxos_replica, OB_LS_REMOVE_NONPAXOS_REPLICA, (ObLSDropNonPaxosReplicaArg)); RPC_S(PR5 ls_modify_paxos_replica_number, OB_LS_MODIFY_PAXOS_REPLICA_NUMBER, (ObLSModifyPaxosReplicaNumberArg)); RPC_S(PR5 ls_check_dr_task_exist, OB_LS_CHECK_DR_TASK_EXIST, (ObDRTaskExistArg), obrpc::Bool); + RPC_S(PR5 ob_exec_drtask_obadmin_command, OB_EXEC_DRTASK_OBADMIN_COMMAND, (ObAdminCommandArg)); #ifdef OB_BUILD_ARBITRATION RPC_S(PR5 add_arb, OB_ADD_ARB, (ObAddArbArg), obrpc::ObAddArbResult); RPC_S(PR5 remove_arb, OB_REMOVE_ARB, (ObRemoveArbArg), obrpc::ObRemoveArbResult); diff --git a/tools/ob_admin/server_tool/ob_admin_routine.cpp b/tools/ob_admin/server_tool/ob_admin_routine.cpp index 520325dc9d..d0fe6f201e 100644 --- a/tools/ob_admin/server_tool/ob_admin_routine.cpp +++ b/tools/ob_admin/server_tool/ob_admin_routine.cpp @@ -552,75 +552,6 @@ DEF_COMMAND(TRANS, kill_part_trans_ctx, 1, return OB_NOT_SUPPORTED; } -// ls_remove_member -// @params [in] tenant_id, which tenant to modify -// @params [in] ls_id, which log stream to modify -// @params [in] svr_ip, the server ip want to delete -// @params [in] svr_port, the server port want to delete -// @params [in] orig_paxos_number, paxos replica number before this deletion -// @params [in] new_paxos_number, paxos replica number after this deletion -// ATTENTION: -// Please make sure let log stream's leader to execute this command -// For permanant offline, orig_paxos_number should equals to new_paxos_number -DEF_COMMAND(TRANS, ls_remove_member, 1, "tenant_id ls_id svr_ip svr_port orig_paxos_number new_paxos_number # ls_remove_member") -{ - int ret = OB_SUCCESS; - string arg_str; - ObLSDropPaxosReplicaArg arg; - int64_t tenant_id_to_set = OB_INVALID_TENANT_ID; - int64_t ls_id_to_set = 0; - int64_t orig_paxos_replica_number = 0; - int64_t new_paxos_replica_number = 0; - int32_t port = 0; - char ip[30]; - - if (cmd_ == action_name_) { - ret = OB_INVALID_ARGUMENT; - ADMIN_WARN("should provide tenant_id, ls_id ,member to remove, previous and new paxos replica number"); - } else { - arg_str = cmd_.substr(action_name_.length() + 1); - } - - if (OB_FAIL(ret)) { - } else if (6 != sscanf(arg_str.c_str(), "%ld %ld %s %d %ld %ld", &tenant_id_to_set, &ls_id_to_set, - ip, &port, &orig_paxos_replica_number, &new_paxos_replica_number)) { - ret = OB_INVALID_ARGUMENT; - COMMON_LOG(WARN, "invalid arg", K(ret), K(arg_str.c_str()), K(cmd_.c_str()), - K(tenant_id_to_set), K(ls_id_to_set), - K(port), K(orig_paxos_replica_number), K(new_paxos_replica_number)); - } else { - common::ObAddr server_to_remove(common::ObAddr::VER::IPV4, ip, port); - common::ObReplicaMember remove_member(server_to_remove, 1); - share::ObTaskId task_id; - share::ObLSID ls_id(ls_id_to_set); - task_id.init(server_to_remove); - if (OB_ISNULL(client_) - || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_to_set - || !ls_id.is_valid_with_tenant(tenant_id_to_set) - || !server_to_remove.is_valid() - || 1 < orig_paxos_replica_number - new_paxos_replica_number - || 0 > orig_paxos_replica_number - new_paxos_replica_number)) { - ret = OB_INVALID_ARGUMENT; - COMMON_LOG(WARN, "invalid argument", K(ret), K(tenant_id_to_set), K(ls_id), - K(remove_member), K(task_id), K(orig_paxos_replica_number), - K(new_paxos_replica_number), K(port), K(ip), KP(client_)); - } else if (OB_FAIL(arg.init( - task_id, - tenant_id_to_set, - ls_id, - remove_member, - orig_paxos_replica_number, - new_paxos_replica_number))) { - COMMON_LOG(WARN, "init arg failed", K(ret), K(task_id), K(tenant_id_to_set), K(ls_id), - K(remove_member), K(orig_paxos_replica_number), K(new_paxos_replica_number)); - } else if (OB_FAIL(client_->ls_remove_paxos_replica(arg))) { - COMMON_LOG(ERROR, "send req fail", K(ret)); - } - } - COMMON_LOG(INFO, "ls_remove_member", K(arg)); - return ret; -} - // remove_lock // @params [in] tenant_id, which tenant to modify // @params [in] ls_id, which log stream to modify @@ -788,6 +719,81 @@ DEF_COMMAND(TRANS, update_lock, 1, "tenant_id ls_id obj_type obj_id lock_mode ow return ret; } + +// remove_ls_replica +// @params [in] tenant_id, which tenant to modify +// @params [in] ls_id, which log stream to modify +// @params [in] server, the server address of the replica to remove +// @params [in] replica_type, what type of replica to remove +// @params [in] orig_paxos_number, paxos replica number before this deletion +// @params [in] new_paxos_number, paxos replica number after this deletion +// @params [in] leader, leader replica's address +// ATTENTION: +// Please make sure tenant_id and ls_id are specified. +// Other parameters are optional, if not specified, it will be automatically caculated +DEF_COMMAND(TRANS, remove_ls_replica, 1, "tenant_id=xxx,ls_id=xxx[server=xxx,replica_type=xxx,orig_paxos_replica_number=xxx,new_paxos_replica_number=xxx,leader=xxx] # remove_ls_replica") +{ + int ret = OB_SUCCESS; + string arg_str; + ObAdminCommandArg arg; + const ObAdminDRTaskType task_type(ObAdminDRTaskType::REMOVE_REPLICA); + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id, ls_id at least"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(client_)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid client", K(ret)); + } else if (OB_FAIL(arg.init(arg_str.c_str(), task_type))) { + COMMON_LOG(WARN, "fail to construct admin command arg", K(ret), K(arg_str.c_str()), K(task_type)); + } else if (OB_FAIL(client_->ob_exec_drtask_obadmin_command(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret), K(arg)); + } + COMMON_LOG(INFO, "remove_ls_replica", K(arg)); + return ret; +} + +// add_ls_replica +// @params [in] tenant_id, which tenant to modify +// @params [in] ls_id, which log stream to modify +// @params [in] server, the server address of the replica to add +// @params [in] replica_type, what type of replica to add +// @params [in] data_source, data source replica server +// @params [in] orig_paxos_number, paxos replica number before this deletion +// @params [in] new_paxos_number, paxos replica number after this deletion +// ATTENTION: +// Please make sure tenant_id, ls_id are specified. +// Other parameters are optional, if not specified, it will be automatically caculated +DEF_COMMAND(TRANS, add_ls_replica, 1, "tenant_id=xxx,ls_id=xxx[,replica_type=xxx,server=xxx,data_source=xxx,orig_paxos_replica_number=xxx,new_paxos_replica_number=xxx] # add_ls_replica") +{ + int ret = OB_SUCCESS; + string arg_str; + ObAdminCommandArg arg; + const ObAdminDRTaskType task_type(ObAdminDRTaskType::ADD_REPLICA); + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id, ls_id at least"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(client_)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid client", K(ret)); + } else if (OB_FAIL(arg.init(arg_str.c_str(), task_type))) { + COMMON_LOG(WARN, "fail to construct admin command arg", K(ret), K(arg_str.c_str()), K(task_type)); + } else if (OB_FAIL(client_->ob_exec_drtask_obadmin_command(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret), K(arg)); + } + COMMON_LOG(INFO, "add_ls_replica", K(arg)); + return ret; +} + #ifdef OB_BUILD_ARBITRATION // force_clear_arb_cluster_info // @params [in] cluster_id, which cluster to modify