notify recovery_ls_servce while has balance task

This commit is contained in:
maosy 2024-02-08 02:44:53 +00:00 committed by ob-robot
parent 1400476e92
commit acb558a7f8
10 changed files with 140 additions and 4 deletions

View File

@ -1076,7 +1076,7 @@ PCODE_DEF(OB_CLONE_TENANT, 0x160A)
PCODE_DEF(OB_CLONE_RESOURCE_POOL, 0x160B)
//160Cfor notify tenant thread
//PCODE_DEF(OB_NOTIFY_TENANT_THREAD, 0x160C)
PCODE_DEF(OB_NOTIFY_TENANT_THREAD, 0x160C)
PCODE_DEF(OB_CREATE_TRIGGER_WITH_RES, 0x160D)

View File

@ -63,7 +63,6 @@
#include "storage/slog_ckpt/ob_tenant_checkpoint_slog_handler.h"
#include "observer/ob_req_time_service.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "rootserver/ob_primary_ls_service.h"//PrimaryLSService
#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService
#include "storage/high_availability/ob_transfer_service.h" // ObTransferService
#include "sql/udr/ob_udr_mgr.h"
@ -80,6 +79,8 @@
#include "observer/table/ttl/ob_ttl_service.h"
#include "storage/tenant_snapshot/ob_tenant_snapshot_service.h"
#include "storage/high_availability/ob_storage_ha_utils.h"
#include "share/ob_rpc_struct.h"
#include "rootserver/ob_recovery_ls_service.h"
namespace oceanbase
{
@ -3025,6 +3026,35 @@ int ObRpcNotifyCloneSchedulerP::process()
return ret;
}
int ObRpcNotifyTenantThreadP::process()
{
int ret = OB_SUCCESS;
LOG_INFO("receive notify tenant thread", K(arg_));
if (OB_UNLIKELY(!arg_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(arg_));
} else {
MTL_SWITCH(arg_.get_tenant_id()) {
if (obrpc::ObNotifyTenantThreadArg::RECOVERY_LS_SERVICE == arg_.get_thread_type()) {
rootserver::ObRecoveryLSService *ls_service =
MTL(rootserver::ObRecoveryLSService *);
if (OB_ISNULL(ls_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service is null", KR(ret), K(arg_));
} else {
ls_service->wakeup();
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected thread type", KR(ret), K(arg_));
}
}
}
return ret;
}
int ObKillClientSessionP::process()
{
int ret = OB_SUCCESS;

View File

@ -262,6 +262,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_INNER_CREATE_TENANT_SNAPSHOT, ObRpcInnerCreateTena
OB_DEFINE_PROCESSOR_S(Srv, OB_INNER_DROP_TENANT_SNAPSHOT, ObRpcInnerDropTenantSnapshotP);
OB_DEFINE_PROCESSOR_S(Srv, OB_FLUSH_LS_ARCHIVE, ObRpcFlushLSArchiveP);
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_CLONE_SCHEDULER, ObRpcNotifyCloneSchedulerP);
OB_DEFINE_PROCESSOR_S(Srv, OB_NOTIFY_TENANT_THREAD, ObRpcNotifyTenantThreadP);
OB_DEFINE_PROCESSOR_S(Srv, OB_TABLET_MAJOR_FREEZE, ObRpcTabletMajorFreezeP);
// OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_CLIENT_SESSION, ObKillClientSessionP);
// OB_DEFINE_PROCESSOR_S(Srv, OB_CLIENT_SESSION_CONNECT_TIME, ObClientSessionConnectTimeP);

View File

@ -125,6 +125,7 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) {
RPC_PROCESSOR(ObRpcInnerDropTenantSnapshotP, gctx_);
RPC_PROCESSOR(ObRpcFlushLSArchiveP, gctx_);
RPC_PROCESSOR(ObRpcNotifyCloneSchedulerP, gctx_);
RPC_PROCESSOR(ObRpcNotifyTenantThreadP, gctx_);
RPC_PROCESSOR(ObRpcTabletMajorFreezeP, gctx_);
RPC_PROCESSOR(ObCancelGatherStatsP, gctx_);
}

View File

@ -106,7 +106,8 @@ void ObLSRecoveryReportor::wait()
void ObLSRecoveryReportor::wakeup()
{
if (OB_NOT_INIT) {
if (IS_NOT_INIT) {
LOG_WARN_RET(OB_NOT_INIT, "not init no need wakeup");
} else {
ObThreadCondGuard guard(get_cond());
get_cond().broadcast();

View File

@ -528,6 +528,7 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
ObBalanceTaskHelper ls_balance_task;
ObBalanceTaskArray balance_task_array;
share::ObAllTenantInfo cur_tenant_info;
int tmp_ret = OB_SUCCESS;
while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) {
if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
*sql_proxy_, ls_balance_task))) {
@ -574,6 +575,9 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
}
if (OB_SUCC(ret) && !is_finish) {
if (OB_TMP_FAIL(notify_recovery_ls_service_())) {
LOG_WARN("failed to notify recovery ls service", KR(tmp_ret));
}
usleep(100L * 1000L);
LOG_INFO("has balance task not finish", K(ls_balance_task),
K(balance_task_array), K(cur_tenant_info));
@ -590,6 +594,44 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
return ret;
}
int ObTenantRoleTransitionService::notify_recovery_ls_service_()
{
int ret = OB_SUCCESS;
ObTimeoutCtx ctx;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_), KP(rpc_proxy_));
} else if (OB_ISNULL(GCTX.location_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("location service is null", KR(ret));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
LOG_WARN("fail to set timeout ctx", KR(ret));
} else {
ObAddr leader;
ObNotifyTenantThreadArg arg;
const int64_t timeout = ctx.get_timeout();
if (OB_FAIL(GCTX.location_service_->get_leader(
GCONF.cluster_id, tenant_id_, SYS_LS, false, leader))) {
LOG_WARN("failed to get leader", KR(ret), K(tenant_id_));
} else if (OB_FAIL(arg.init(tenant_id_, obrpc::ObNotifyTenantThreadArg::RECOVERY_LS_SERVICE))) {
LOG_WARN("failed to init arg", KR(ret), K(tenant_id_));
} else if (OB_FAIL(rpc_proxy_->to(leader).timeout(timeout)
.group_id(share::OBCG_DBA_COMMAND).by(tenant_id_)
.notify_tenant_thread(arg))) {
LOG_WARN("failed to notify tenant thread", KR(ret),
K(leader), K(tenant_id_), K(timeout), K(arg));
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(GCTX.location_service_->nonblock_renew(
GCONF.cluster_id, tenant_id_, SYS_LS))) {
LOG_WARN("failed to renew location", KR(ret), KR(tmp_ret), K(tenant_id_));
}
}
}
return ret;
}
int ObTenantRoleTransitionService::do_switch_access_mode_to_raw_rw(
const share::ObAllTenantInfo &tenant_info)
{

View File

@ -226,6 +226,7 @@ private:
const SCN &max_sys_ls_sync_scn/* SYS LS real max sync scn */,
const SCN &target_tenant_sync_scn/* tenant target sync scn in switchover */);
int wait_ls_balance_task_finish_();
int notify_recovery_ls_service_();
int get_all_ls_status_and_change_access_mode_(
const palf::AccessMode target_access_mode,
const share::SCN &ref_scn,

View File

@ -8385,6 +8385,32 @@ const char* ObNotifySwitchLeaderArg::comment_to_str() const
OB_SERIALIZE_MEMBER(ObNotifySwitchLeaderArg, tenant_id_, ls_id_, advise_leader_, comment_);
int ObNotifyTenantThreadArg::init(
const uint64_t tenant_id, const TenantThreadType thread_type)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) ||
INVALID_TYPE == thread_type)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(thread_type));
} else {
tenant_id_ = tenant_id;
thread_type_ = thread_type;
}
return ret;
}
int ObNotifyTenantThreadArg::assign(const ObNotifyTenantThreadArg &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
tenant_id_ = other.tenant_id_;
thread_type_ = other.thread_type_;
}
return ret;
}
OB_SERIALIZE_MEMBER(ObNotifyTenantThreadArg, tenant_id_, thread_type_);
bool ObBatchRemoveTabletArg::is_valid() const
{
bool is_valid = id_.is_valid();

View File

@ -3742,7 +3742,7 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObNotifySwitchLeaderArg);
private:
//tenant_id is invalid weakup all tenant
//teenant_id is valid weakup target tenant
//tenant_id is valid weakup target tenant
uint64_t tenant_id_;
//ls_id is invalid iterator all ls
//ls_id is valid, only check target ls
@ -3752,6 +3752,39 @@ private:
SwitchLeaderComment comment_;
};
struct ObNotifyTenantThreadArg
{
OB_UNIS_VERSION(1);
public:
enum TenantThreadType
{
INVALID_TYPE = -1,
RECOVERY_LS_SERVICE,
};
ObNotifyTenantThreadArg() : tenant_id_(OB_INVALID_TENANT_ID), thread_type_(INVALID_TYPE) {}
~ObNotifyTenantThreadArg() {}
TO_STRING_KV(K_(tenant_id), K_(thread_type));
int init(const uint64_t tenant_id, const TenantThreadType thread_type);
int assign(const ObNotifyTenantThreadArg &other);
bool is_valid() const
{
return is_valid_tenant_id(tenant_id_) && INVALID_TYPE != thread_type_;
}
uint64_t get_tenant_id() const
{
return tenant_id_;
}
TenantThreadType get_thread_type() const
{
return thread_type_;
}
private:
DISALLOW_COPY_AND_ASSIGN(ObNotifyTenantThreadArg);
private:
uint64_t tenant_id_;
TenantThreadType thread_type_;
};
struct ObCreateTabletInfo
{
OB_UNIS_VERSION(1);

View File

@ -244,6 +244,7 @@ public:
RPC_AP(PR5 inner_drop_tenant_snapshot, OB_INNER_DROP_TENANT_SNAPSHOT, (obrpc::ObInnerDropTenantSnapshotArg), obrpc::ObInnerDropTenantSnapshotResult);
RPC_AP(PR5 flush_ls_archive, OB_FLUSH_LS_ARCHIVE, (obrpc::ObFlushLSArchiveArg), obrpc::Int64);
RPC_S(PR5 notify_clone_scheduler, OB_NOTIFY_CLONE_SCHEDULER, (obrpc::ObNotifyCloneSchedulerArg), obrpc::ObNotifyCloneSchedulerResult);
RPC_S(PR5 notify_tenant_thread, OB_NOTIFY_TENANT_THREAD, (obrpc::ObNotifyTenantThreadArg));
RPC_AP(PR5 tablet_major_freeze, OB_TABLET_MAJOR_FREEZE, (ObTabletMajorFreezeArg), obrpc::Int64);
RPC_AP(PR5 kill_client_session, OB_KILL_CLIENT_SESSION, (ObKillClientSessionArg), ObKillClientSessionRes);
RPC_S(PR5 client_session_create_time, OB_CLIENT_SESSION_CONNECT_TIME, (ObClientSessionCreateTimeAndAuthArg), ObClientSessionCreateTimeAndAuthRes);