revert gts cache leader optimization
This commit is contained in:
parent
80ea36d9ab
commit
d1b473dd29
@ -200,6 +200,7 @@ int ObGtsSource::get_gts(const MonotonicTs stc,
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t tmp_gts = 0;
|
||||
bool need_send_rpc = false;
|
||||
ObAddr leader;
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -219,18 +220,18 @@ int ObGtsSource::get_gts(const MonotonicTs stc,
|
||||
TRANS_LOG(DEBUG, "query_gts", KR(ret), K(need_send_rpc), K(stc),
|
||||
K(gts_local_cache_.get_latest_srr()));
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
if (!gts_cache_leader_.is_valid() && OB_SUCCESS != (tmp_ret = refresh_gts_cache_leader_())) {
|
||||
TRANS_LOG(WARN, "refresh gts leader fail", K(tmp_ret), K_(tenant_id));
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_leader_(leader))) {
|
||||
TRANS_LOG(WARN, "get gts leader fail", K(tmp_ret), K_(tenant_id));
|
||||
(void)refresh_gts_location_();
|
||||
} else if (gts_cache_leader_ == server_) {
|
||||
} else if (leader == server_) {
|
||||
MTL_SWITCH(tenant_id_) {
|
||||
ret = OB_EAGAIN;
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
// Here the error code is overwritten by the result of the local call
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(gts, receive_gts_ts))) {
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(leader, gts, receive_gts_ts))) {
|
||||
if (OB_EAGAIN != tmp_ret) {
|
||||
if (EXECUTE_COUNT_PER_SEC(16)) {
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K_(gts_cache_leader), K_(server), K(tmp_ret));
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K(leader), K_(server), K(tmp_ret));
|
||||
}
|
||||
refresh_gts_location();
|
||||
}
|
||||
@ -244,11 +245,11 @@ int ObGtsSource::get_gts(const MonotonicTs stc,
|
||||
} else {
|
||||
// If not in local, refresh gts
|
||||
if (need_send_rpc) {
|
||||
if (OB_SUCCESS != (tmp_ret = query_gts_(gts_cache_leader_))) {
|
||||
TRANS_LOG(WARN, "query gts fail", K(tmp_ret), K_(gts_cache_leader));
|
||||
if (OB_SUCCESS != (tmp_ret = query_gts_(leader))) {
|
||||
TRANS_LOG(WARN, "query gts fail", K(tmp_ret), K(leader));
|
||||
}
|
||||
}
|
||||
TRANS_LOG(DEBUG, "after query gts", KR(tmp_ret), K_(gts_cache_leader), K(need_send_rpc));
|
||||
TRANS_LOG(DEBUG, "after query gts", KR(tmp_ret), K(leader), K(need_send_rpc));
|
||||
}
|
||||
// If ret is not OB_SUCCESS, it means that an asynchronous task needs to be added to wait for the subsequent gts value
|
||||
if (OB_FAIL(ret) && NULL != task) {
|
||||
@ -274,7 +275,8 @@ int ObGtsSource::get_gts(const MonotonicTs stc,
|
||||
}
|
||||
|
||||
// Get the timestamp from the local timestamp service
|
||||
int ObGtsSource::get_gts_from_local_timestamp_service_(int64_t >s,
|
||||
int ObGtsSource::get_gts_from_local_timestamp_service_(ObAddr &leader,
|
||||
int64_t >s,
|
||||
MonotonicTs &receive_gts_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -284,15 +286,18 @@ int ObGtsSource::get_gts_from_local_timestamp_service_(int64_t >s,
|
||||
ObTimestampAccess *timestamp_access = MTL(ObTimestampAccess *);
|
||||
if (OB_ISNULL(timestamp_access)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "timestamp access is null", KR(ret), KP(timestamp_access), K_(tenant_id));
|
||||
TRANS_LOG(ERROR, "timestamp access is null", KR(ret), KP(timestamp_access), K_(tenant_id), K(leader));
|
||||
} else if (OB_FAIL(timestamp_access->get_number(ObTimeUtility::current_time_ns(), tmp_gts))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(100)) {
|
||||
TRANS_LOG(WARN, "global_timestamp_service get gts fail", K(tmp_gts), KR(ret));
|
||||
TRANS_LOG(WARN, "global_timestamp_service get gts fail", K(leader), K(tmp_gts), KR(ret));
|
||||
}
|
||||
if (OB_NOT_MASTER == ret) {
|
||||
gts_cache_leader_.reset();
|
||||
}
|
||||
} else {
|
||||
if (leader.is_valid()) {
|
||||
gts_cache_leader_ = leader;
|
||||
}
|
||||
const MonotonicTs tmp_receive_gts_ts = MonotonicTs::current_time();
|
||||
if (OB_FAIL(gts_local_cache_.update_gts_and_check_barrier(srr,
|
||||
tmp_gts,
|
||||
@ -308,10 +313,11 @@ int ObGtsSource::get_gts_from_local_timestamp_service_(int64_t >s,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGtsSource::get_gts_from_local_timestamp_service_(int64_t >s)
|
||||
int ObGtsSource::get_gts_from_local_timestamp_service_(ObAddr &leader,
|
||||
int64_t >s)
|
||||
{
|
||||
MonotonicTs unused_receive_gts_ts;
|
||||
return get_gts_from_local_timestamp_service_(gts, unused_receive_gts_ts);
|
||||
return get_gts_from_local_timestamp_service_(leader, gts, unused_receive_gts_ts);
|
||||
}
|
||||
|
||||
int ObGtsSource::get_srr(MonotonicTs &srr)
|
||||
@ -376,6 +382,7 @@ int ObGtsSource::wait_gts_elapse(const int64_t ts, ObTsCbTask *task, bool &need_
|
||||
} else {
|
||||
int64_t gts = 0;
|
||||
bool tmp_need_wait = false;
|
||||
ObAddr leader;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_FAIL(gts_local_cache_.get_gts(gts))) {
|
||||
if (OB_UNLIKELY(OB_EAGAIN != ret)) {
|
||||
@ -392,14 +399,14 @@ int ObGtsSource::wait_gts_elapse(const int64_t ts, ObTsCbTask *task, bool &need_
|
||||
}
|
||||
if (OB_SUCCESS == ret && tmp_need_wait) {
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
if (!gts_cache_leader_.is_valid() && OB_SUCCESS != (tmp_ret = refresh_gts_cache_leader_())) {
|
||||
TRANS_LOG(WARN, "refresh gts leader fail", K(tmp_ret), K_(tenant_id));
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_leader_(leader))) {
|
||||
TRANS_LOG(WARN, "get gts leader fail", K(tmp_ret), K_(tenant_id));
|
||||
(void)refresh_gts_location_();
|
||||
} else if (gts_cache_leader_ == server_) {
|
||||
} else if (leader == server_) {
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(gts))) {
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(leader, gts))) {
|
||||
if (OB_EAGAIN != tmp_ret && EXECUTE_COUNT_PER_SEC(100)) {
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K_(gts_cache_leader), K_(server), K(tmp_ret));
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K(leader), K_(server), K(tmp_ret));
|
||||
}
|
||||
} else if (ts <= gts) {
|
||||
tmp_need_wait = false;
|
||||
@ -452,6 +459,7 @@ int ObGtsSource::wait_gts_elapse(const int64_t ts)
|
||||
TRANS_LOG(WARN, "invalid argument", KR(ret), K(ts));
|
||||
} else {
|
||||
int64_t gts = 0;
|
||||
ObAddr leader;
|
||||
if (OB_FAIL(gts_local_cache_.get_gts(gts))) {
|
||||
if (OB_UNLIKELY(OB_EAGAIN != ret)) {
|
||||
TRANS_LOG(WARN, "get gts failed", K(ret));
|
||||
@ -465,14 +473,14 @@ int ObGtsSource::wait_gts_elapse(const int64_t ts)
|
||||
if (OB_FAIL(ret)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
if (!gts_cache_leader_.is_valid() && OB_SUCCESS != (tmp_ret = refresh_gts_cache_leader_())) {
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_leader_(leader))) {
|
||||
TRANS_LOG(WARN, "get gts leader fail", K(tmp_ret), K_(tenant_id));
|
||||
(void)refresh_gts_location_();
|
||||
} else if (gts_cache_leader_ == server_) {
|
||||
} else if (leader == server_) {
|
||||
// When getting gts, if the global timestamp service is locally, get gts directly
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(gts))) {
|
||||
if (OB_SUCCESS != (tmp_ret = get_gts_from_local_timestamp_service_(leader, gts))) {
|
||||
if (OB_EAGAIN != tmp_ret) {
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K_(gts_cache_leader), K_(server), K(tmp_ret));
|
||||
TRANS_LOG(WARN, "get_gts_from_local_timestamp_service fail", K(leader), K_(server), K(tmp_ret));
|
||||
}
|
||||
} else if (ts <= gts) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -481,7 +489,7 @@ int ObGtsSource::wait_gts_elapse(const int64_t ts)
|
||||
}
|
||||
} else {
|
||||
// If the leader is not in local, gts needs to be refreshed
|
||||
if (OB_SUCCESS != (tmp_ret = query_gts_(gts_cache_leader_))) {
|
||||
if (OB_SUCCESS != (tmp_ret = query_gts_(leader))) {
|
||||
TRANS_LOG(WARN, "refresh gts failed", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
@ -508,7 +516,7 @@ int ObGtsSource::refresh_gts(const bool need_refresh)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGtsSource::refresh_gts_cache_leader_()
|
||||
int ObGtsSource::get_gts_leader_(ObAddr &leader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t cluster_id = GCONF.cluster_id;
|
||||
@ -519,15 +527,19 @@ int ObGtsSource::refresh_gts_cache_leader_()
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
if (OB_FAIL(location_adapter_->nonblock_get_leader(cluster_id, tenant_id_, GTS_LS, gts_cache_leader_))) {
|
||||
if (gts_cache_leader_.is_valid()) {
|
||||
leader = gts_cache_leader_;
|
||||
} else if (OB_FAIL(location_adapter_->nonblock_get_leader(cluster_id, tenant_id_, GTS_LS, leader))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(16)) {
|
||||
TRANS_LOG(WARN, "gts nonblock get leader failed", K(ret), K_(tenant_id), K(GTS_LS));
|
||||
}
|
||||
} else {
|
||||
gts_cache_leader_ = leader;
|
||||
}
|
||||
if (OB_SUCC(ret) && !gts_cache_leader_.is_valid()) {
|
||||
if (OB_SUCC(ret) && !leader.is_valid()) {
|
||||
ret = OB_LS_LOCATION_LEADER_NOT_EXIST;
|
||||
if (EXECUTE_COUNT_PER_SEC(16)) {
|
||||
TRANS_LOG(WARN, "gts cache leader is invalid", KR(ret), K_(gts_cache_leader), K(*this));
|
||||
TRANS_LOG(WARN, "gts cache leader is invalid", KR(ret), K(leader), K(*this));
|
||||
}
|
||||
}
|
||||
|
||||
@ -572,15 +584,16 @@ int ObGtsSource::refresh_gts_location_()
|
||||
int ObGtsSource::refresh_gts_(const bool need_refresh)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAddr leader;
|
||||
bool need_refresh_gts_location = need_refresh;
|
||||
|
||||
if (!gts_cache_leader_.is_valid() && OB_FAIL(refresh_gts_cache_leader_())) {
|
||||
if (OB_FAIL(get_gts_leader_(leader))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(16)) {
|
||||
TRANS_LOG(WARN, "get gts leader failed", KR(ret), K_(tenant_id));
|
||||
}
|
||||
need_refresh_gts_location = true;
|
||||
} else {
|
||||
ret = query_gts_(gts_cache_leader_);
|
||||
ret = query_gts_(leader);
|
||||
}
|
||||
if (need_refresh_gts_location) {
|
||||
(void)refresh_gts_location_();
|
||||
|
@ -94,14 +94,16 @@ public:
|
||||
int refresh_gts_location() { return refresh_gts_location_(); }
|
||||
TO_STRING_KV(K_(tenant_id), K_(gts_local_cache), K_(server), K_(gts_cache_leader));
|
||||
private:
|
||||
int refresh_gts_cache_leader_();
|
||||
int get_gts_leader_(common::ObAddr &leader);
|
||||
int refresh_gts_location_();
|
||||
int refresh_gts_(const bool need_refresh);
|
||||
int query_gts_(const common::ObAddr &leader);
|
||||
void statistics_();
|
||||
int get_gts_from_local_timestamp_service_(int64_t >s,
|
||||
int get_gts_from_local_timestamp_service_(common::ObAddr &leader,
|
||||
int64_t >s,
|
||||
MonotonicTs &receive_gts_ts);
|
||||
int get_gts_from_local_timestamp_service_(int64_t >s);
|
||||
int get_gts_from_local_timestamp_service_(common::ObAddr &leader,
|
||||
int64_t >s);
|
||||
public:
|
||||
static const int64_t GET_GTS_QUEUE_COUNT = 1;
|
||||
static const int64_t WAIT_GTS_QUEUE_COUNT = 1;
|
||||
|
@ -19,7 +19,7 @@
|
||||
#include "storage/ob_storage_struct.h" // ObMigrateStatus
|
||||
|
||||
// 定期更新黑名单的时间间隔(us)
|
||||
#define BLACK_LIST_REFRESH_INTERVAL 1000000 // 1s
|
||||
#define BLACK_LIST_REFRESH_INTERVAL 5000000 // 5s
|
||||
// 判断时间戳是否赶上/落后的缓冲时间(ns),避免阈值附近的日志流反复加入/移出黑名单
|
||||
#define BLACK_LIST_WHITEWASH_INTERVAL_NS 1000000000 // 1s
|
||||
// 黑名单信息打印时间间隔(us)
|
||||
|
Loading…
x
Reference in New Issue
Block a user