diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 2434240c80..5ee7e7e12c 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -1211,7 +1211,10 @@ PCODE_DEF(OB_GET_SS_MICRO_CACHE_INFO, 0x1630) PCODE_DEF(OB_CLEAR_SS_MICRO_CACHE, 0x1632) PCODE_DEF(OB_COLLECT_MV_MERGE_INFO, 0x1633) PCODE_DEF(OB_FETCH_STABLE_MEMBER_LIST, 0x1634) - +PCODE_DEF(OB_DEL_SS_LOCAL_TMPFILE, 0x1635) +PCODE_DEF(OB_DEL_SS_LOCAL_MAJOR, 0x1636) +PCODE_DEF(OB_CALIBRATE_SS_DISK_SPACE, 0x1637) +PCODE_DEF(OB_DEL_SS_TABLET_MICRO, 0x1638) //**** 注意:在此行之前增加新的RPC ID ****** // //占位须知: diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 4b0afe8d19..60a8e1e41a 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -3849,6 +3849,92 @@ int ObDelSSTabletMetaP::process() return ret; } +int ObDelSSLocalTmpFileP::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("start delete ss_local_tmpfile process", K_(arg)); + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(arg)); + } else { + MTL_SWITCH(arg_.tenant_id_) { + ObTenantFileManager *file_mgr = nullptr; + if (OB_ISNULL(file_mgr = MTL(ObTenantFileManager *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("MTL ObTenantFileManager is null", KR(ret), K_(arg_.tenant_id)); + } else if (OB_FAIL(file_mgr->delete_local_tmp_file(arg_.macro_id_, true/* is_only_delete_read_cache */))) { + LOG_WARN("fail to delete ss_local_tmpfile", KR(ret), K_(arg_.macro_id)); + } + } + } + return ret; +} + +int ObDelSSLocalMajorP::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("start delete ss_local_major process", K_(arg)); + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(arg)); + } else { + MTL_SWITCH(arg_.tenant_id_) { + ObTenantFileManager *file_mgr = nullptr; + const int64_t cur_time_s = ObTimeUtility::current_time_s(); + if (OB_ISNULL(file_mgr = MTL(ObTenantFileManager *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("MTL ObTenantFileManager is null", KR(ret), K_(arg_.tenant_id)); + } else if (OB_FAIL(file_mgr->delete_local_major_data_dir(cur_time_s))) { + LOG_WARN("fail to delete ss_local_major", KR(ret), K(cur_time_s)); + } + } + } + return ret; +} + +int ObCalibrateSSDiskSpaceP::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("start calibrate_ss_disk_space process", K_(arg)); + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(arg)); + } else { + MTL_SWITCH(arg_.tenant_id_) { + ObTenantFileManager *file_mgr = nullptr; + if (OB_ISNULL(file_mgr = MTL(ObTenantFileManager *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("MTL ObTenantFileManager is null", KR(ret), K_(arg_.tenant_id)); + } else if (OB_FAIL(file_mgr->calibrate_disk_space())) { + LOG_WARN("fail to calibrate_ss_disk_space", KR(ret)); + } + } + } + return ret; +} + +int ObDelSSTabletMicroP::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("start del_ss_tablet_micro process", K_(arg)); + if (!arg_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K_(arg)); + } else { + MTL_SWITCH(arg_.tenant_id_) { + ObSSMicroCache *micro_cache = nullptr; + if (OB_ISNULL(micro_cache = MTL(ObSSMicroCache *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ObSSMicroCache is null", KR(ret), K_(arg_.tenant_id)); + } else if (OB_FAIL(micro_cache->clear_micro_meta_by_tablet_id(arg_.tablet_id_))) { + LOG_WARN("fail to del_ss_tablet_micro", KR(ret)); + } + } + } + return ret; +} + + int ObEnableSSMicroCacheP::process() { int ret = OB_SUCCESS; diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 0a38cca63f..631b11b9db 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -251,6 +251,10 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_DEL_SS_TABLET_META, ObDelSSTabletMetaP); OB_DEFINE_PROCESSOR_S(Srv, OB_ENABLE_SS_MICRO_CACHE, ObEnableSSMicroCacheP); OB_DEFINE_PROCESSOR_S(Srv, OB_GET_SS_MICRO_CACHE_INFO, ObGetSSMicroCacheInfoP); OB_DEFINE_PROCESSOR_S(Srv, OB_CLEAR_SS_MICRO_CACHE, ObRpcClearSSMicroCacheP); +OB_DEFINE_PROCESSOR_S(Srv, OB_DEL_SS_LOCAL_TMPFILE, ObDelSSLocalTmpFileP); +OB_DEFINE_PROCESSOR_S(Srv, OB_DEL_SS_LOCAL_MAJOR, ObDelSSLocalMajorP); +OB_DEFINE_PROCESSOR_S(Srv, OB_CALIBRATE_SS_DISK_SPACE, ObCalibrateSSDiskSpaceP); +OB_DEFINE_PROCESSOR_S(Srv, OB_DEL_SS_TABLET_MICRO, ObDelSSTabletMicroP); #endif OB_DEFINE_PROCESSOR_S(Srv, OB_REMOTE_WRITE_DDL_INC_COMMIT_LOG, ObRpcRemoteWriteDDLIncCommitLogP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_LS_CAN_OFFLINE, ObRpcCheckLSCanOfflineP); diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index 45fdcb3784..c448a15e92 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -161,6 +161,10 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObEnableSSMicroCacheP, gctx_); RPC_PROCESSOR(ObGetSSMicroCacheInfoP, gctx_); RPC_PROCESSOR(ObRpcClearSSMicroCacheP, gctx_); + RPC_PROCESSOR(ObDelSSLocalTmpFileP, gctx_); + RPC_PROCESSOR(ObDelSSLocalMajorP, gctx_); + RPC_PROCESSOR(ObCalibrateSSDiskSpaceP, gctx_); + RPC_PROCESSOR(ObDelSSTabletMicroP, gctx_); #endif RPC_PROCESSOR(ObNotifySharedStorageInfoP, gctx_); } diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index d62cf36b7f..fb77849e47 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -11370,6 +11370,10 @@ OB_SERIALIZE_MEMBER(ObEnableSSMicroCacheArg, tenant_id_, is_enabled_); OB_SERIALIZE_MEMBER(ObGetSSMicroCacheInfoArg, tenant_id_); OB_SERIALIZE_MEMBER(ObGetSSMicroCacheInfoResult, micro_cache_stat_, super_block_, arc_info_); OB_SERIALIZE_MEMBER(ObClearSSMicroCacheArg, tenant_id_); +OB_SERIALIZE_MEMBER(ObDelSSLocalTmpFileArg, tenant_id_, macro_id_); +OB_SERIALIZE_MEMBER(ObDelSSLocalMajorArg, tenant_id_); +OB_SERIALIZE_MEMBER(ObCalibrateSSDiskSpaceArg, tenant_id_); +OB_SERIALIZE_MEMBER(ObDelSSTabletMicroArg, tenant_id_, tablet_id_); #endif ObRpcRemoteWriteDDLIncCommitLogArg::ObRpcRemoteWriteDDLIncCommitLogArg() diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 7f5860c116..84f1a1d0c6 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -11450,7 +11450,8 @@ public: ObSSARCInfo arc_info_; }; -struct ObClearSSMicroCacheArg { +struct ObClearSSMicroCacheArg final +{ OB_UNIS_VERSION(1); public: @@ -11465,8 +11466,69 @@ public: public: uint64_t tenant_id_; }; -#endif +struct ObDelSSLocalTmpFileArg final +{ + OB_UNIS_VERSION(1); +public: + ObDelSSLocalTmpFileArg() : tenant_id_(OB_INVALID_TENANT_ID), macro_id_() {} + ~ObDelSSLocalTmpFileArg() {} + bool is_valid() const + { + return OB_INVALID_TENANT_ID != tenant_id_ && macro_id_.is_valid(); + } + TO_STRING_KV(K_(tenant_id), K_(macro_id)); +public: + int64_t tenant_id_; + blocksstable::MacroBlockId macro_id_; +}; + +struct ObDelSSLocalMajorArg final +{ + OB_UNIS_VERSION(1); +public: + ObDelSSLocalMajorArg() : tenant_id_(OB_INVALID_TENANT_ID) {} + ~ObDelSSLocalMajorArg() {} + bool is_valid() const + { + return OB_INVALID_TENANT_ID != tenant_id_; + } + TO_STRING_KV(K_(tenant_id)); +public: + int64_t tenant_id_; +}; + +struct ObDelSSTabletMicroArg final +{ + OB_UNIS_VERSION(1); +public: + ObDelSSTabletMicroArg() : tenant_id_(OB_INVALID_TENANT_ID), tablet_id_() {} + ~ObDelSSTabletMicroArg() {} + bool is_valid() const + { + return OB_INVALID_TENANT_ID != tenant_id_ && tablet_id_.is_valid(); + } + TO_STRING_KV(K_(tenant_id), K_(tablet_id)); +public: + int64_t tenant_id_; + ObTabletID tablet_id_; +}; + +struct ObCalibrateSSDiskSpaceArg final +{ + OB_UNIS_VERSION(1); +public: + ObCalibrateSSDiskSpaceArg() : tenant_id_(OB_INVALID_TENANT_ID){} + ~ObCalibrateSSDiskSpaceArg() {} + bool is_valid() const + { + return OB_INVALID_TENANT_ID != tenant_id_; + } + TO_STRING_KV(K_(tenant_id)); +public: + int64_t tenant_id_; +}; +#endif struct ObRpcRemoteWriteDDLIncCommitLogArg final { OB_UNIS_VERSION(1); diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 48970cb6cd..8b8595a80f 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -235,6 +235,10 @@ public: RPC_S(PR5 enable_ss_micro_cache, OB_ENABLE_SS_MICRO_CACHE, (obrpc::ObEnableSSMicroCacheArg)); RPC_S(PR5 get_ss_micro_cache_info, OB_GET_SS_MICRO_CACHE_INFO, (obrpc::ObGetSSMicroCacheInfoArg), obrpc::ObGetSSMicroCacheInfoResult); RPC_S(PR5 clear_ss_micro_cache, OB_CLEAR_SS_MICRO_CACHE, (obrpc::ObClearSSMicroCacheArg)); + RPC_S(PR5 del_ss_local_tmpfile, OB_DEL_SS_LOCAL_TMPFILE, (obrpc::ObDelSSLocalTmpFileArg)); + RPC_S(PR5 del_ss_local_major, OB_DEL_SS_LOCAL_MAJOR, (obrpc::ObDelSSLocalMajorArg)); + RPC_S(PR5 calibrate_ss_disk_space, OB_CALIBRATE_SS_DISK_SPACE, (obrpc::ObCalibrateSSDiskSpaceArg)); + RPC_S(PR5 del_ss_tablet_micro, OB_DEL_SS_TABLET_MICRO, (obrpc::ObDelSSTabletMicroArg)); #endif RPC_S(PR5 remote_write_ddl_inc_commit_log, OB_REMOTE_WRITE_DDL_INC_COMMIT_LOG, (obrpc::ObRpcRemoteWriteDDLIncCommitLogArg), ObRpcRemoteWriteDDLIncCommitLogRes); RPC_S(PR5 check_ls_can_offline, OB_CHECK_LS_CAN_OFFLINE, (obrpc::ObCheckLSCanOfflineArg)); diff --git a/tools/ob_admin/server_tool/ob_admin_routine.cpp b/tools/ob_admin/server_tool/ob_admin_routine.cpp index a8e5ebb565..ad50d0bd55 100644 --- a/tools/ob_admin/server_tool/ob_admin_routine.cpp +++ b/tools/ob_admin/server_tool/ob_admin_routine.cpp @@ -1271,6 +1271,155 @@ DEF_COMMAND(SERVER, del_ss_tablet_meta, 1, "tenant_id:tablet_id:compaction_scn") return ret; } +DEF_COMMAND(SERVER, del_ss_local_tmpfile, 1, "tenant_id:tmpfile_id") +{ + int ret = OB_SUCCESS; + string arg_str; + ObDelSSLocalTmpFileArg arg; + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id, tmpfile_id"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + int64_t tmpfile_id = 0; + if (OB_FAIL(ret)) { + } else if (2 != sscanf(arg_str.c_str(), "%ld:%ld", &arg.tenant_id_, &tmpfile_id)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid arg", K(ret), K(arg_str.c_str())); + } else { + MacroBlockId macro_id; + macro_id.set_ss_version(MacroBlockId::MACRO_BLOCK_ID_VERSION_V2); + macro_id.set_ss_id_mode(static_cast(ObMacroBlockIdMode::ID_MODE_SHARE)); + macro_id.set_storage_object_type(static_cast(ObStorageObjectType::TMP_FILE)); + macro_id.set_second_id(tmpfile_id); + arg.macro_id_ = macro_id; + const int64_t rpc_timeout = 60000000; + if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "argument is invalid", K(ret), K(arg)); + } else if (OB_FALSE_IT(client_->set_timeout(rpc_timeout))) { + } else if (OB_FAIL(client_->del_ss_local_tmpfile(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret)); + } else { + fprintf( + stdout, "Successfully del_ss_local_tmpfile [tenant_id:%ld, tmpfile_id:%ld]", arg.tenant_id_, tmpfile_id); + } + } + if (OB_FAIL(ret)) { + fprintf(stderr, "fail to del_ss_local_tmpfile, ret=%s\n", ob_error_name(ret)); + } + COMMON_LOG(INFO, "del_ss_local_tmpfile", K(arg)); + return ret; +} + +DEF_COMMAND(SERVER, del_ss_local_major, 1, "tenant_id") +{ + int ret = OB_SUCCESS; + string arg_str; + ObDelSSLocalMajorArg arg; + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + const int64_t rpc_timeout = 60000000; + if (OB_FAIL(ret)) { + } else if (1 != sscanf(arg_str.c_str(), "%ld", &arg.tenant_id_)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid arg", K(ret), K(arg_str.c_str())); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "argument is invalid", K(ret), K(arg)); + } else if (OB_FALSE_IT(client_->set_timeout(rpc_timeout))) { + } else if (OB_FAIL(client_->del_ss_local_major(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret)); + } else { + fprintf(stdout, "Successfully del_ss_local_major [tenant_id:%ld]", arg.tenant_id_); + } + + if (OB_FAIL(ret)) { + fprintf(stderr, "fail to del_ss_local_major, ret=%s\n", ob_error_name(ret)); + } + COMMON_LOG(INFO, "del_ss_local_major", K(arg)); + return ret; +} + +DEF_COMMAND(SERVER, calibrate_ss_disk_space, 1, "tenant_id") +{ + int ret = OB_SUCCESS; + string arg_str; + ObCalibrateSSDiskSpaceArg arg; + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + const int64_t rpc_timeout = 60000000; + if (OB_FAIL(ret)) { + } else if (1 != sscanf(arg_str.c_str(), "%ld", &arg.tenant_id_)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid arg", K(ret), K(arg_str.c_str())); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "argument is invalid", K(ret), K(arg)); + } else if (OB_FALSE_IT(client_->set_timeout(rpc_timeout))) { + } else if (OB_FAIL(client_->calibrate_ss_disk_space(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret)); + } else { + fprintf(stdout, "Successfully calibrate_ss_disk_space [tenant_id:%ld]", arg.tenant_id_); + } + + if (OB_FAIL(ret)) { + fprintf(stderr, "fail to calibrate_ss_disk_space, ret=%s\n", ob_error_name(ret)); + } + COMMON_LOG(INFO, "calibrate_ss_disk_space", K(arg)); + return ret; +} + +DEF_COMMAND(SERVER, del_ss_tablet_micro, 1, "tenant_id:tablet_id") +{ + int ret = OB_SUCCESS; + string arg_str; + ObDelSSTabletMicroArg arg; + if (cmd_ == action_name_) { + ret = OB_INVALID_ARGUMENT; + ADMIN_WARN("should provide tenant_id, tablet_id"); + } else { + arg_str = cmd_.substr(action_name_.length() + 1); + } + + int64_t tablet_id = 0; + const int64_t rpc_timeout = 1800000000; // 3min + if (OB_FAIL(ret)) { + } else if (2 != sscanf(arg_str.c_str(), "%ld:%ld", &arg.tenant_id_, &tablet_id)) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "invalid arg", K(ret), K(arg_str.c_str())); + } else { + arg.tablet_id_ = ObTabletID(tablet_id); + if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + COMMON_LOG(WARN, "argument is invalid", K(ret), K(arg)); + } else if (OB_FALSE_IT(client_->set_timeout(rpc_timeout))) { + } else if (OB_FAIL(client_->del_ss_tablet_micro(arg))) { + COMMON_LOG(ERROR, "send req fail", K(ret)); + } else { + fprintf(stdout, "Successfully del_ss_tablet_micro [tenant_id:%ld, tablet_id:%ld]", arg.tenant_id_, arg.tablet_id_.id()); + } + } + + if (OB_FAIL(ret)) { + fprintf(stderr, "fail to del_ss_tablet_micro, ret=%s\n", ob_error_name(ret)); + } + COMMON_LOG(INFO, "del_ss_tablet_micro", K(arg)); + return ret; +} + DEF_COMMAND(SERVER, download_ss_macro_block, 1, "tenant_id:ver:mode:obj_type:incar_id:cg_id:second_id:third_id:fourth_id #download ss_macro_block") { int ret = OB_SUCCESS;