[CP] support alter sys tenant log disk via ob_admin
This commit is contained in:
parent
bb56b615fd
commit
04da26071a
@ -989,6 +989,7 @@ PCODE_DEF(OB_LOG_GET_ARB_MEMBER_INFO, 0x1522)
|
||||
PCODE_DEF(OB_LOG_BATCH_FETCH_RESP, 0X1523)
|
||||
PCODE_DEF(OB_LOG_GET_LS_CKPT, 0x1524)
|
||||
PCODE_DEF(OB_LOG_ACQUIRE_REBUILD_INFO, 0x1525)
|
||||
// for ob_admin
|
||||
PCODE_DEF(OB_LOG_FORCE_SET_TENANT_LOG_DISK, 0x1526)
|
||||
PCODE_DEF(OB_LOG_SYNC_BASE_LSN_REQ, 0x1527)
|
||||
|
||||
|
@ -1556,5 +1556,35 @@ DEFINE_GET_SERIALIZE_SIZE(ObServerLogBlockMgr::LogPoolMetaEntry)
|
||||
size += serialization::encoded_length_i64(sizeof(checksum_));
|
||||
return size;
|
||||
}
|
||||
|
||||
int ObServerLogBlockMgr::force_update_tenant_log_disk(const uint64_t tenant_id,
|
||||
const int64_t new_log_disk_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MTL_SWITCH(tenant_id) {
|
||||
int64_t unused_size = 0;
|
||||
int64_t old_log_disk_size = 0;
|
||||
int64_t allowed_new_log_disk_size = 0;
|
||||
ObLogService *log_service = MTL(ObLogService*);
|
||||
if (NULL == log_service) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(ERROR, "unexpected error, ObLogService is nullptr", KR(ret), KP(log_service));
|
||||
} else if (OB_FAIL(log_service->get_palf_stable_disk_usage(unused_size, old_log_disk_size))) {
|
||||
CLOG_LOG(ERROR, "get_palf_stable_disk_usage failed", KR(ret), KP(log_service));
|
||||
} else if (OB_FAIL(update_tenant(old_log_disk_size, new_log_disk_size, allowed_new_log_disk_size, log_service))) {
|
||||
CLOG_LOG(WARN, "update_tenant failed", KR(ret), KP(log_service));
|
||||
} else if (allowed_new_log_disk_size != new_log_disk_size) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
CLOG_LOG(WARN, "can not force update tenant log disk, force_update_tenant_log_disk failed", KR(ret), KP(log_service), K(new_log_disk_size),
|
||||
K(allowed_new_log_disk_size), K(old_log_disk_size));
|
||||
} else {
|
||||
}
|
||||
CLOG_LOG(INFO, "force_update_tenant_log_disk finished", KR(ret), KP(log_service), K(new_log_disk_size),
|
||||
K(allowed_new_log_disk_size), K(old_log_disk_size));
|
||||
} else {
|
||||
CLOG_LOG(WARN, "force_update_tenant_log_disk failed, no such tenant", KR(ret), K(tenant_id), K(new_log_disk_size));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // namespace logservice
|
||||
} // namespace oceanbase
|
||||
|
@ -202,6 +202,8 @@ public:
|
||||
// NB: accurately, when tenant not exist in 'omt_', we can remove it from ObServerLogBlockMgr
|
||||
int remove_tenant(const int64_t log_disk_size);
|
||||
|
||||
int force_update_tenant_log_disk(const uint64_t tenant_id,
|
||||
const int64_t new_log_disk_size);
|
||||
TO_STRING_KV("dir:",
|
||||
log_pool_path_, K_(dir_fd), K_(meta_fd), K_(log_pool_meta),
|
||||
K_(min_block_id), K_(max_block_id), K(min_log_disk_size_for_all_tenants_),
|
||||
@ -338,6 +340,7 @@ private:
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObServerLogBlockMgr);
|
||||
};
|
||||
|
||||
} // namespace logservice
|
||||
} // namespace oceanbase
|
||||
#endif
|
||||
|
@ -82,6 +82,7 @@
|
||||
#include "storage/high_availability/ob_storage_ha_utils.h"
|
||||
#include "share/ob_rpc_struct.h"
|
||||
#include "rootserver/ob_recovery_ls_service.h"
|
||||
#include "logservice/ob_server_log_block_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -521,6 +522,7 @@ int ObRpcSetTenantConfigP::process()
|
||||
{
|
||||
LOG_INFO("process set tenant config", K(arg_));
|
||||
OTC_MGR.add_extra_config(arg_);
|
||||
OTC_MGR.notify_tenant_config_changed(arg_.tenant_id_);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
@ -3184,5 +3186,61 @@ int ObCancelGatherStatsP::process()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObForceSetTenantLogDiskP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!arg_.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", K(ret));
|
||||
} else if (OB_FAIL(GCTX.log_block_mgr_->force_update_tenant_log_disk(arg_.tenant_id_, arg_.log_disk_size_))) {
|
||||
LOG_WARN("force_update_sys_tenant_log_disk failed", K(ret), "tenant_id", arg_.tenant_id_, "new_log_disk", arg_.log_disk_size_);
|
||||
} else {
|
||||
LOG_WARN("force_update_sys_tenant_log_disk success", K(ret), "tenant_id", arg_.tenant_id_, "new_log_disk", arg_.log_disk_size_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
class ObDumpUnitInfoFunctor {
|
||||
public:
|
||||
ObDumpUnitInfoFunctor(ObSArray<ObDumpServerUsageResult::ObUnitInfo> &result) : result_(result) {}
|
||||
int operator()()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDumpServerUsageResult::ObUnitInfo info;
|
||||
info.tenant_id_ = MTL_ID();
|
||||
logservice::ObLogService *log_service = MTL(logservice::ObLogService*);
|
||||
int64_t log_disk_size = 0, log_disk_in_use = 0;
|
||||
if (OB_ISNULL(log_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("log_service is nullptr", KP(log_service));
|
||||
} else if (OB_FAIL(log_service->get_palf_stable_disk_usage(log_disk_in_use, log_disk_size))) {
|
||||
LOG_WARN("get_palf_stable_disk_usage failed", KP(log_service));
|
||||
} else {
|
||||
info.log_disk_in_use_ = log_disk_in_use;
|
||||
info.log_disk_size_ = log_disk_size;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(result_.push_back(info))) {
|
||||
LOG_WARN("push_back failed", KR(ret), K(info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
private:
|
||||
ObSArray<ObDumpServerUsageResult::ObUnitInfo> &result_;
|
||||
};
|
||||
int ObForceDumpServerUsageP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t &log_disk_assigned = result_.server_info_.log_disk_assigned_;
|
||||
int64_t &log_disk_capacity = result_.server_info_.log_disk_capacity_;
|
||||
ObSArray<ObDumpServerUsageResult::ObUnitInfo> &result = result_.unit_info_;
|
||||
ObDumpUnitInfoFunctor dump_unit_info(result);
|
||||
if (OB_FAIL(GCTX.omt_->operate_in_each_tenant(dump_unit_info))) {
|
||||
CLOG_LOG(WARN, "operate_in_each_tenant failed", KR(ret));
|
||||
} else if (OB_FAIL(GCTX.log_block_mgr_->get_disk_usage(log_disk_assigned, log_disk_capacity))) {
|
||||
CLOG_LOG(WARN, "get_disk_usage failed", KR(ret));
|
||||
} else {}
|
||||
return ret;
|
||||
}
|
||||
} // end of namespace observer
|
||||
} // end of namespace oceanbase
|
||||
|
@ -268,6 +268,8 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_TABLET_MAJOR_FREEZE, ObRpcTabletMajorFreezeP);
|
||||
// OB_DEFINE_PROCESSOR_S(Srv, OB_CLIENT_SESSION_CONNECT_TIME, ObClientSessionConnectTimeP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_TABLET_LOCATION_BROADCAST, ObTabletLocationReceiveP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CANCEL_GATHER_STATS, ObCancelGatherStatsP);
|
||||
OB_DEFINE_PROCESSOR_OBADMIN(Srv, OB_LOG_FORCE_SET_TENANT_LOG_DISK, ObForceSetTenantLogDiskP);
|
||||
OB_DEFINE_PROCESSOR_OBADMIN(Srv, OB_FORCE_DUMP_SERVER_USAGE, ObForceDumpServerUsageP);
|
||||
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_CLIENT_SESSION, ObKillClientSessionP);
|
||||
OB_DEFINE_PROCESSOR_S(Srv, OB_CLIENT_SESSION_CONNECT_TIME, ObClientSessionConnectTimeP);
|
||||
|
@ -120,6 +120,8 @@ void oceanbase::observer::init_srv_xlator_for_partition(ObSrvRpcXlator *xlator)
|
||||
RPC_PROCESSOR(ObRpcGetLSAccessModeP, gctx_);
|
||||
RPC_PROCESSOR(ObRpcChangeLSAccessModeP, gctx_);
|
||||
RPC_PROCESSOR(ObTabletLocationReceiveP, gctx_);
|
||||
RPC_PROCESSOR(ObForceSetTenantLogDiskP, gctx_);
|
||||
RPC_PROCESSOR(ObForceDumpServerUsageP, gctx_);
|
||||
}
|
||||
|
||||
void oceanbase::observer::init_srv_xlator_for_migrator(ObSrvRpcXlator *xlator) {
|
||||
|
@ -10815,5 +10815,58 @@ int ObCancelGatherStatsArg::assign(const ObCancelGatherStatsArg &other)
|
||||
}
|
||||
OB_SERIALIZE_MEMBER(ObCancelGatherStatsArg, tenant_id_, task_id_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObForceSetTenantLogDiskArg, tenant_id_, log_disk_size_);
|
||||
|
||||
int ObForceSetTenantLogDiskArg::set(const uint64_t tenant_id,
|
||||
const int64_t log_disk_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_valid_tenant_id(tenant_id) || log_disk_size <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argumetn", K(tenant_id), K(log_disk_size));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
log_disk_size_ = log_disk_size;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObForceSetTenantLogDiskArg::assign(const ObForceSetTenantLogDiskArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argumnt", K(arg));
|
||||
} else {
|
||||
tenant_id_ = arg.tenant_id_;
|
||||
log_disk_size_ = arg.log_disk_size_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObForceSetTenantLogDiskArg::is_valid() const
|
||||
{
|
||||
return is_valid_tenant_id(tenant_id_) && 0 < log_disk_size_;
|
||||
}
|
||||
|
||||
void ObForceSetTenantLogDiskArg::reset()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_TENANT_ID;
|
||||
log_disk_size_ = -1;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDumpServerUsageRequest, tenant_id_);
|
||||
OB_SERIALIZE_MEMBER(ObDumpServerUsageResult::ObServerInfo, log_disk_capacity_, log_disk_assigned_);
|
||||
OB_SERIALIZE_MEMBER(ObDumpServerUsageResult::ObUnitInfo, tenant_id_, log_disk_size_, log_disk_in_use_);
|
||||
OB_SERIALIZE_MEMBER(ObDumpServerUsageResult, server_info_, unit_info_);
|
||||
int ObDumpServerUsageResult::assign(const ObDumpServerUsageResult &rhs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(unit_info_.assign(rhs.unit_info_))) {
|
||||
LOG_WARN("assign failed", KR(ret));
|
||||
} else {
|
||||
server_info_ = rhs.server_info_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}//end namespace obrpc
|
||||
}//end namespace oceanbase
|
||||
|
@ -11242,6 +11242,68 @@ public:
|
||||
TO_STRING_KV(K(tenant_id_), K(task_id_));
|
||||
};
|
||||
|
||||
struct ObForceSetTenantLogDiskArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObForceSetTenantLogDiskArg() { reset(); }
|
||||
~ObForceSetTenantLogDiskArg() { reset(); }
|
||||
int set(const uint64_t tenant_id,
|
||||
const int64_t log_disk_size);
|
||||
int assign(const ObForceSetTenantLogDiskArg &arg);
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
TO_STRING_KV(K_(tenant_id), K_(log_disk_size));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
int64_t log_disk_size_;
|
||||
};
|
||||
|
||||
struct ObDumpServerUsageRequest final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObDumpServerUsageRequest() : tenant_id_(OB_SYS_TENANT_ID) {}
|
||||
~ObDumpServerUsageRequest() { tenant_id_ = OB_INVALID_TENANT_ID; }
|
||||
uint64_t tenant_id_;
|
||||
};
|
||||
|
||||
struct ObDumpServerUsageResult final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
struct ObServerInfo {
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObServerInfo() : log_disk_capacity_(0), log_disk_assigned_(0)
|
||||
{}
|
||||
~ObServerInfo() {}
|
||||
int64_t log_disk_capacity_;
|
||||
int64_t log_disk_assigned_;
|
||||
TO_STRING_KV(K_(log_disk_capacity), K_(log_disk_assigned));
|
||||
};
|
||||
struct ObUnitInfo {
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObUnitInfo() : tenant_id_(OB_INVALID_TENANT_ID), log_disk_size_(0), log_disk_in_use_(0)
|
||||
{}
|
||||
~ObUnitInfo() {}
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
int64_t log_disk_size_;
|
||||
int64_t log_disk_in_use_;
|
||||
TO_STRING_KV(K_(tenant_id), K_(log_disk_size), K_(log_disk_in_use));
|
||||
};
|
||||
public:
|
||||
ObDumpServerUsageResult() : server_info_(), unit_info_() {}
|
||||
~ObDumpServerUsageResult() {}
|
||||
int assign(const ObDumpServerUsageResult &rhs);
|
||||
TO_STRING_KV(K_(server_info), K_(unit_info));
|
||||
// server info
|
||||
ObServerInfo server_info_;
|
||||
// unit info
|
||||
ObSArray<ObUnitInfo> unit_info_;
|
||||
};
|
||||
}//end namespace obrpc
|
||||
}//end namespace oceanbase
|
||||
#endif
|
||||
|
@ -250,6 +250,8 @@ public:
|
||||
RPC_S(PR5 client_session_create_time, OB_CLIENT_SESSION_CONNECT_TIME, (ObClientSessionCreateTimeAndAuthArg), ObClientSessionCreateTimeAndAuthRes);
|
||||
RPC_AP(PR5 tablet_location_send, OB_TABLET_LOCATION_BROADCAST, (obrpc::ObTabletLocationSendArg), obrpc::ObTabletLocationSendResult);
|
||||
RPC_S(PR5 cancel_gather_stats, OB_CANCEL_GATHER_STATS, (ObCancelGatherStatsArg));
|
||||
RPC_S(PR5 force_set_tenant_log_disk, OB_LOG_FORCE_SET_TENANT_LOG_DISK, (obrpc::ObForceSetTenantLogDiskArg));
|
||||
RPC_S(PR5 dump_server_usage, OB_FORCE_DUMP_SERVER_USAGE, (obrpc::ObDumpServerUsageRequest), obrpc::ObDumpServerUsageResult);
|
||||
}; // end of class ObSrvRpcProxy
|
||||
|
||||
} // end of namespace rpc
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "ob_admin_utils.h"
|
||||
#include "share/ob_rpc_struct.h"
|
||||
#include "storage/tablelock/ob_table_lock_rpc_struct.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
@ -864,3 +865,64 @@ DEF_COMMAND(SERVER, unlock_member_list, 1, "tenant_id:ls_id:lock_id # unlock_mem
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_COMMAND(SERVER, force_set_sys_tenant_log_disk, 1, "log_disk_size=xx# set sys log disk")
|
||||
{
|
||||
string arg_str;
|
||||
int ret = OB_SUCCESS;
|
||||
if (cmd_ == action_name_) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid argument, should provide new log disk size");
|
||||
} else {
|
||||
arg_str = cmd_.substr(action_name_.length() + 1);
|
||||
const char *arg_cstr = arg_str.c_str();
|
||||
int64_t log_disk_size = 0;
|
||||
uint64_t tenant_id = OB_SYS_TENANT_ID;
|
||||
if (1 != sscanf(arg_str.c_str(), "log_disk_size=%ld", &log_disk_size)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid argument", K(arg_cstr));
|
||||
} else {
|
||||
ObForceSetTenantLogDiskArg arg;
|
||||
arg.set(tenant_id, log_disk_size);
|
||||
if (OB_FAIL(client_->force_set_tenant_log_disk(arg))) {
|
||||
CLOG_LOG(WARN, "force_set_tenant_log_disk failed", K(arg_cstr));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_COMMAND(SERVER, dump_server_usage, 1, "output: [server_info]\n\
|
||||
log_disk_assigned=xx\n\
|
||||
log_disk_capacity=xx\n\
|
||||
[unit_info]\n\
|
||||
tenant_id=xx\n\
|
||||
log_disk_in_use=xx\n\
|
||||
log_disk_size=xx")
|
||||
{
|
||||
string arg_str;
|
||||
int ret = OB_SUCCESS;
|
||||
if (cmd_ != action_name_) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid argument, no need provide argument");
|
||||
} else {
|
||||
ObDumpServerUsageRequest arg;
|
||||
ObDumpServerUsageResult result;
|
||||
if (OB_FAIL(client_->dump_server_usage(arg, result))) {
|
||||
CLOG_LOG(WARN, "dump_tenant_log_disk failed");
|
||||
} else {
|
||||
ObDumpServerUsageResult::ObServerInfo &server_info = result.server_info_;
|
||||
ObSArray<ObDumpServerUsageResult::ObUnitInfo> &unit_info = result.unit_info_;
|
||||
fprintf(stdout, "[server info]\n");
|
||||
fprintf(stdout, "log_disk_assigned=%ld\n", server_info.log_disk_assigned_);
|
||||
fprintf(stdout, "log_disk_capacity=%ld\n", server_info.log_disk_capacity_);
|
||||
for (int i = 0; i < unit_info.count(); i++) {
|
||||
ObDumpServerUsageResult::ObUnitInfo &info = unit_info[i];
|
||||
fprintf(stdout, "[unit info]\n");
|
||||
fprintf(stdout, "tenant_id=%ld\n", info.tenant_id_);
|
||||
fprintf(stdout, "log_disk_assigned=%ld\n", info.log_disk_in_use_);
|
||||
fprintf(stdout, "log_disk_capacity=%ld\n", info.log_disk_size_);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user