adaptation interface get_valid_sts_after
This commit is contained in:
@ -137,25 +137,25 @@ void ObTenantInfoLoader::run2()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantInfoLoader::get_valid_sts_after(const int64_t specified_time, share::SCN &standby_scn)
|
int ObTenantInfoLoader::get_valid_sts_after(const int64_t specified_time_us, share::SCN &standby_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
standby_scn.set_min();
|
standby_scn.set_min();
|
||||||
share::ObAllTenantInfo tenant_info;
|
share::ObAllTenantInfo tenant_info;
|
||||||
int64_t refresh_time_us = OB_INVALID_TIMESTAMP;
|
int64_t refresh_time_us = OB_INVALID_TIMESTAMP;
|
||||||
|
|
||||||
if (OB_INVALID_TIMESTAMP == specified_time) {
|
if (OB_INVALID_TIMESTAMP == specified_time_us) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(specified_time));
|
LOG_WARN("invalid argument", KR(ret), K(specified_time_us));
|
||||||
} else if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info, refresh_time_us))) {
|
} else if (OB_FAIL(tenant_info_cache_.get_tenant_info(tenant_info, refresh_time_us))) {
|
||||||
if (OB_NEED_WAIT == ret) {
|
if (OB_NEED_WAIT == ret) {
|
||||||
LOG_TRACE("tenant info cache is not refreshed, need wait", KR(ret));
|
LOG_TRACE("tenant info cache is not refreshed, need wait", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("failed to get tenant info", KR(ret));
|
LOG_WARN("failed to get tenant info", KR(ret));
|
||||||
}
|
}
|
||||||
} else if (refresh_time_us < specified_time) {
|
} else if (refresh_time_us < specified_time_us) {
|
||||||
ret = OB_NEED_WAIT;
|
ret = OB_NEED_WAIT;
|
||||||
LOG_TRACE("tenant info cache is old, need wait", KR(ret), K(refresh_time_us), K(specified_time), K(tenant_info));
|
LOG_TRACE("tenant info cache is old, need wait", KR(ret), K(refresh_time_us), K(specified_time_us), K(tenant_info));
|
||||||
wakeup();
|
wakeup();
|
||||||
} else if (!tenant_info.is_sts_ready()) {
|
} else if (!tenant_info.is_sts_ready()) {
|
||||||
ret = OB_NEED_WAIT;
|
ret = OB_NEED_WAIT;
|
||||||
@ -166,7 +166,7 @@ int ObTenantInfoLoader::get_valid_sts_after(const int64_t specified_time, share:
|
|||||||
|
|
||||||
const int64_t PRINT_INTERVAL = 3 * 1000 * 1000L;
|
const int64_t PRINT_INTERVAL = 3 * 1000 * 1000L;
|
||||||
if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
|
if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
|
||||||
LOG_INFO("get_valid_sts_after", KR(ret), K(specified_time), K(refresh_time_us), K(tenant_info));
|
LOG_INFO("get_valid_sts_after", KR(ret), K(specified_time_us), K(refresh_time_us), K(tenant_info));
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -89,7 +89,7 @@ public:
|
|||||||
* 2. tenant info cache is old, need wait
|
* 2. tenant info cache is old, need wait
|
||||||
* 3. sts can not work for current tenant status
|
* 3. sts can not work for current tenant status
|
||||||
*/
|
*/
|
||||||
int get_valid_sts_after(const int64_t specified_time, share::SCN &standby_scn);
|
int get_valid_sts_after(const int64_t specified_time_us, share::SCN &standby_scn);
|
||||||
int refresh_tenant_info();
|
int refresh_tenant_info();
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
|||||||
@ -101,6 +101,7 @@ void ObStandbyTimestampService::destroy()
|
|||||||
//TODO(SCN):zhaoxing last_id should be uint64_t
|
//TODO(SCN):zhaoxing last_id should be uint64_t
|
||||||
last_id_ = OB_INVALID_VERSION;
|
last_id_ = OB_INVALID_VERSION;
|
||||||
epoch_ = OB_INVALID_TIMESTAMP;
|
epoch_ = OB_INVALID_TIMESTAMP;
|
||||||
|
switch_to_leader_ts_ = OB_INVALID_TIMESTAMP;
|
||||||
TG_DESTROY(tg_id_);
|
TG_DESTROY(tg_id_);
|
||||||
rpc_.destroy();
|
rpc_.destroy();
|
||||||
TRANS_LOG(INFO, "standby timestamp service destroy", K_(tenant_id));
|
TRANS_LOG(INFO, "standby timestamp service destroy", K_(tenant_id));
|
||||||
@ -109,23 +110,22 @@ void ObStandbyTimestampService::destroy()
|
|||||||
int ObStandbyTimestampService::query_and_update_last_id()
|
int ObStandbyTimestampService::query_and_update_last_id()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
share::ObAllTenantInfo tenant_info;
|
SCN standby_scn;
|
||||||
if (OB_FAIL(MTL(rootserver::ObTenantInfoLoader *)->get_tenant_info(tenant_info))) {
|
int64_t switch_to_leader_ts = ATOMIC_LOAD(&switch_to_leader_ts_);
|
||||||
if (REACH_TIME_INTERVAL(3 * 1000 * 1000)) {
|
int64_t query_ts = OB_INVALID_TIMESTAMP != switch_to_leader_ts ? switch_to_leader_ts : 0;
|
||||||
TRANS_LOG(WARN, "failed to get tenant info", K(ret), K(tenant_info));
|
if (OB_FAIL(MTL(rootserver::ObTenantInfoLoader *)->get_valid_sts_after(query_ts, standby_scn))) {
|
||||||
}
|
|
||||||
} else if (OB_UNLIKELY(!tenant_info.is_valid())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
TRANS_LOG(WARN, "invalid tenant info", K(ret), K(tenant_info));
|
|
||||||
} else if (!tenant_info.is_primary()) {
|
|
||||||
if (last_id_ > 0 && (tenant_info.get_standby_scn().get_val_for_gts() < last_id_)) {
|
|
||||||
TRANS_LOG(ERROR, "snapshot rolls back ", K(tenant_info), K_(last_id));
|
|
||||||
} else {
|
|
||||||
inc_update(&last_id_, (int64_t)tenant_info.get_standby_scn().get_val_for_gts());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (print_error_log_interval_.reach()) {
|
if (print_error_log_interval_.reach()) {
|
||||||
TRANS_LOG(INFO, "tenant role isn't standby", K(ret), K(tenant_info));
|
TRANS_LOG(INFO, "tenant info is invalid", K(ret), K(query_ts), K(standby_scn), KPC(this));
|
||||||
|
}
|
||||||
|
} else if (!standby_scn.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
TRANS_LOG(WARN, "invalid argument", K(ret), K(query_ts), K(standby_scn), KPC(this));
|
||||||
|
} else {
|
||||||
|
if (last_id_ > 0 && (standby_scn.get_val_for_gts() < last_id_)) {
|
||||||
|
TRANS_LOG(ERROR, "snapshot rolls back ", K_(switch_to_leader_ts), K(standby_scn), K_(last_id));
|
||||||
|
} else {
|
||||||
|
inc_update(&last_id_, (int64_t)standby_scn.get_val_for_gts());
|
||||||
|
ATOMIC_BCAS(&switch_to_leader_ts_, switch_to_leader_ts, OB_INVALID_TIMESTAMP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (print_id_log_interval_.reach()) {
|
if (print_id_log_interval_.reach()) {
|
||||||
@ -174,17 +174,13 @@ void ObStandbyTimestampService::switch_to_follower_forcedly()
|
|||||||
|
|
||||||
int ObStandbyTimestampService::resume_leader()
|
int ObStandbyTimestampService::resume_leader()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int64_t type = MTL(ObTimestampAccess *)->get_service_type();
|
||||||
if (OB_FAIL(query_and_update_last_id())) {
|
if (ObTimestampAccess::ServiceType::FOLLOWER == type) {
|
||||||
TRANS_LOG(WARN, "query and update last id fail", KR(ret));
|
MTL(ObTimestampAccess *)->set_service_type(ObTimestampAccess::ServiceType::STS_LEADER);
|
||||||
} else {
|
|
||||||
int64_t type = MTL(ObTimestampAccess *)->get_service_type();
|
|
||||||
if (ObTimestampAccess::ServiceType::FOLLOWER == type) {
|
|
||||||
MTL(ObTimestampAccess *)->set_service_type(ObTimestampAccess::ServiceType::STS_LEADER);
|
|
||||||
}
|
|
||||||
TRANS_LOG(INFO, "ObStandbyTimestampService resume leader success", K(type), "service_type", MTL(ObTimestampAccess *)->get_service_type(), KPC(this));
|
|
||||||
}
|
}
|
||||||
return ret;
|
(void)query_and_update_last_id();
|
||||||
|
TRANS_LOG(INFO, "ObStandbyTimestampService resume leader success", K(type), "service_type", MTL(ObTimestampAccess *)->get_service_type(), KPC(this));
|
||||||
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObStandbyTimestampService::switch_to_leader()
|
int ObStandbyTimestampService::switch_to_leader()
|
||||||
@ -194,14 +190,14 @@ int ObStandbyTimestampService::switch_to_leader()
|
|||||||
int64_t tmp_epoch = OB_INVALID_TIMESTAMP;
|
int64_t tmp_epoch = OB_INVALID_TIMESTAMP;
|
||||||
if (OB_FAIL(MTL(logservice::ObLogService *)->get_palf_role(share::GTS_LS, role, tmp_epoch))) {
|
if (OB_FAIL(MTL(logservice::ObLogService *)->get_palf_role(share::GTS_LS, role, tmp_epoch))) {
|
||||||
TRANS_LOG(WARN, "get ObStandbyTimestampService role fail", KR(ret));
|
TRANS_LOG(WARN, "get ObStandbyTimestampService role fail", KR(ret));
|
||||||
} else if (OB_FAIL(query_and_update_last_id())) {
|
|
||||||
TRANS_LOG(WARN, "query and update last id fail", KR(ret));
|
|
||||||
} else {
|
} else {
|
||||||
|
ATOMIC_STORE(&switch_to_leader_ts_, ObTimeUtility::current_time());
|
||||||
epoch_ = tmp_epoch;
|
epoch_ = tmp_epoch;
|
||||||
int64_t type = MTL(ObTimestampAccess *)->get_service_type();
|
int64_t type = MTL(ObTimestampAccess *)->get_service_type();
|
||||||
if (ObTimestampAccess::ServiceType::FOLLOWER == type) {
|
if (ObTimestampAccess::ServiceType::FOLLOWER == type) {
|
||||||
MTL(ObTimestampAccess *)->set_service_type(ObTimestampAccess::ServiceType::STS_LEADER);
|
MTL(ObTimestampAccess *)->set_service_type(ObTimestampAccess::ServiceType::STS_LEADER);
|
||||||
}
|
}
|
||||||
|
(void)query_and_update_last_id();
|
||||||
TRANS_LOG(INFO, "ObStandbyTimestampService switch to leader success", K(type), "service_type", MTL(ObTimestampAccess *)->get_service_type(), KPC(this));
|
TRANS_LOG(INFO, "ObStandbyTimestampService switch to leader success", K(type), "service_type", MTL(ObTimestampAccess *)->get_service_type(), KPC(this));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -279,6 +275,11 @@ int ObStandbyTimestampService::get_number(int64_t >s)
|
|||||||
if (EXECUTE_COUNT_PER_SEC(10)) {
|
if (EXECUTE_COUNT_PER_SEC(10)) {
|
||||||
TRANS_LOG(WARN, "ObStandbyTimestampService is not leader", K(ret), KPC(this));
|
TRANS_LOG(WARN, "ObStandbyTimestampService is not leader", K(ret), KPC(this));
|
||||||
}
|
}
|
||||||
|
} else if (OB_INVALID_TIMESTAMP != ATOMIC_LOAD(&switch_to_leader_ts_)) {
|
||||||
|
ret = OB_GTS_NOT_READY;
|
||||||
|
if (EXECUTE_COUNT_PER_SEC(10)) {
|
||||||
|
TRANS_LOG(WARN, "ObStandbyTimestampService is not serving", K(ret), KPC(this));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
gts = ATOMIC_LOAD(&last_id_);
|
gts = ATOMIC_LOAD(&last_id_);
|
||||||
if (OB_INVALID_VERSION == gts) {
|
if (OB_INVALID_VERSION == gts) {
|
||||||
|
|||||||
@ -47,6 +47,7 @@ class ObStandbyTimestampService : public logservice::ObIRoleChangeSubHandler, pu
|
|||||||
public:
|
public:
|
||||||
ObStandbyTimestampService() : inited_(false), last_id_(OB_INVALID_VERSION), tenant_id_(OB_INVALID_ID),
|
ObStandbyTimestampService() : inited_(false), last_id_(OB_INVALID_VERSION), tenant_id_(OB_INVALID_ID),
|
||||||
epoch_(OB_INVALID_TIMESTAMP), tg_id_(-1),
|
epoch_(OB_INVALID_TIMESTAMP), tg_id_(-1),
|
||||||
|
switch_to_leader_ts_(OB_INVALID_TIMESTAMP),
|
||||||
print_error_log_interval_(3 * 1000 * 1000),
|
print_error_log_interval_(3 * 1000 * 1000),
|
||||||
print_id_log_interval_(3 * 1000 * 1000) {}
|
print_id_log_interval_(3 * 1000 * 1000) {}
|
||||||
virtual ~ObStandbyTimestampService() { destroy(); }
|
virtual ~ObStandbyTimestampService() { destroy(); }
|
||||||
@ -69,7 +70,7 @@ public:
|
|||||||
int check_leader(bool &leader);
|
int check_leader(bool &leader);
|
||||||
int get_number(int64_t >s);
|
int get_number(int64_t >s);
|
||||||
int64_t get_last_id() const { return last_id_; }
|
int64_t get_last_id() const { return last_id_; }
|
||||||
TO_STRING_KV(K_(inited), K_(last_id), K_(tenant_id), K_(epoch), K_(self));
|
TO_STRING_KV(K_(inited), K_(last_id), K_(tenant_id), K_(epoch), K_(self), K_(switch_to_leader_ts));
|
||||||
private:
|
private:
|
||||||
int query_and_update_last_id();
|
int query_and_update_last_id();
|
||||||
int handle_local_request_(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result);
|
int handle_local_request_(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result);
|
||||||
@ -81,6 +82,7 @@ private:
|
|||||||
int tg_id_;
|
int tg_id_;
|
||||||
ObGtsResponseRpc rpc_;
|
ObGtsResponseRpc rpc_;
|
||||||
common::ObAddr self_;
|
common::ObAddr self_;
|
||||||
|
int64_t switch_to_leader_ts_;
|
||||||
common::ObTimeInterval print_error_log_interval_;
|
common::ObTimeInterval print_error_log_interval_;
|
||||||
common::ObTimeInterval print_id_log_interval_;
|
common::ObTimeInterval print_id_log_interval_;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user