[CP] fix: gts service advancing too slowly after switching leader
This commit is contained in:
@ -289,7 +289,7 @@ int ObGtsSource::get_gts_from_local_timestamp_service_(ObAddr &leader,
|
||||
if (OB_ISNULL(timestamp_access)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "timestamp access is null", KR(ret), KP(timestamp_access), K_(tenant_id), K(leader));
|
||||
} else if (OB_FAIL(timestamp_access->get_number(ObTimeUtility::current_time_ns(), tmp_gts))) {
|
||||
} else if (OB_FAIL(timestamp_access->get_number(tmp_gts))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(100)) {
|
||||
TRANS_LOG(WARN, "global_timestamp_service get gts fail", K(leader), K(tmp_gts), KR(ret));
|
||||
}
|
||||
|
||||
@ -35,12 +35,11 @@ int ObTimestampAccess::handle_request(const ObGtsRequest &request, obrpc::ObGtsR
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTimestampAccess::get_number(const int64_t base_id, int64_t >s)
|
||||
int ObTimestampAccess::get_number(int64_t >s)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (GTS_LEADER == service_type_) {
|
||||
int64_t unused_id = 0;
|
||||
ret = MTL(ObTimestampService *)->get_number(1, base_id, gts, unused_id);
|
||||
ret = MTL(ObTimestampService *)->get_timestamp(gts);
|
||||
} else if (STS_LEADER == service_type_) {
|
||||
ret = MTL(ObStandbyTimestampService *)->get_number(gts);
|
||||
} else {
|
||||
|
||||
@ -48,7 +48,7 @@ public:
|
||||
void set_service_type(const ServiceType service_type) { service_type_ = service_type; }
|
||||
ServiceType get_service_type() const { return service_type_; }
|
||||
int handle_request(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result);
|
||||
int get_number(const int64_t base_id, int64_t >s);
|
||||
int get_number(int64_t >s);
|
||||
void get_virtual_info(int64_t &ts_value,
|
||||
ServiceType &service_type,
|
||||
common::ObRole &role,
|
||||
|
||||
@ -33,6 +33,9 @@ int ObTimestampService::init(rpc::frame::ObReqTransport *req_transport)
|
||||
self_ = self;
|
||||
service_type_ = ServiceType::TimestampService;
|
||||
pre_allocated_range_ = TIMESTAMP_PREALLOCATED_RANGE;
|
||||
ATOMIC_STORE(&last_gts_, 0);
|
||||
ATOMIC_STORE(&last_request_ts_, 0);
|
||||
ATOMIC_STORE(&check_gts_speed_lock_, 0);
|
||||
return rpc_.init(req_transport, self);
|
||||
}
|
||||
|
||||
@ -49,6 +52,70 @@ int ObTimestampService::mtl_init(ObTimestampService *×tamp_service)
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// The interface for getting gts timestamp, actually a wrapper of ObIDService::get_number.
|
||||
//
|
||||
// For most cases, the gts service uses the machine clock's time as gts timestamp, which means
|
||||
// the gts service advances as fast as the machine clock. Howerver, when the gts service switches
|
||||
// leader, the new leader will pre-allocate a range of timestamps, and this can lead to the gts
|
||||
// timestamp becoming larger than the machine clock's time. Then the gts service will slow down to
|
||||
// wait the machine clock. But we don't want the gts service to advance too slowly(when the request
|
||||
// rate is low), since the observer may wait too long before the gts timestamp crosses log SCN.
|
||||
// So we periodically check the gts service's advancing speed, and if it's far slower than the
|
||||
// machine clock, we manually push the gts ahead.
|
||||
int ObTimestampService::get_timestamp(int64_t >s)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t unused_id;
|
||||
// 100ms
|
||||
const int64_t CHECK_INTERVAL = 100000000;
|
||||
const int64_t current_time = ObTimeUtility::current_time_ns();
|
||||
int64_t last_request_ts = ATOMIC_LOAD(&last_request_ts_);
|
||||
int64_t time_delta = current_time - last_request_ts;
|
||||
|
||||
ret = get_number(1, current_time, gts, unused_id);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if ((last_request_ts == 0 || time_delta < 0) && ATOMIC_BCAS(&check_gts_speed_lock_, 0, 1)) {
|
||||
last_request_ts = ATOMIC_LOAD(&last_request_ts_);
|
||||
time_delta = current_time - last_request_ts;
|
||||
// before, we only do a fast check, and we should check again after we get the lock
|
||||
if (last_request_ts == 0 || time_delta < 0) {
|
||||
ATOMIC_STORE(&last_request_ts_, current_time);
|
||||
ATOMIC_STORE(&last_gts_, gts);
|
||||
}
|
||||
ATOMIC_STORE(&check_gts_speed_lock_, 0);
|
||||
} else if (time_delta > CHECK_INTERVAL && ATOMIC_BCAS(&check_gts_speed_lock_, 0, 1)) {
|
||||
last_request_ts = ATOMIC_LOAD(&last_request_ts_);
|
||||
time_delta = current_time - last_request_ts;
|
||||
// before, we only do a fast check, and we should check again after we get the lock
|
||||
if (time_delta > CHECK_INTERVAL) {
|
||||
const int64_t last_gts = ATOMIC_LOAD(&last_gts_);
|
||||
const int64_t gts_delta = gts - last_gts;
|
||||
const int64_t compensation_threshold = time_delta / 2;
|
||||
const int64_t compensation_value = time_delta / 10;
|
||||
// if the gts service advanced too slowly, then we add it up with `compensation_value`
|
||||
if (time_delta - gts_delta > compensation_threshold) {
|
||||
ret = get_number(compensation_value, current_time, gts, unused_id);
|
||||
TRANS_LOG(WARN, "the gts service advanced too slowly", K(ret), K(current_time),
|
||||
K(last_request_ts), K(time_delta), K(last_gts), K(gts), K(gts_delta),
|
||||
K(compensation_value));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ATOMIC_STORE(&last_request_ts_, current_time);
|
||||
ATOMIC_STORE(&last_gts_, gts);
|
||||
}
|
||||
TRANS_LOG(DEBUG, "check the gts service advancing speed", K(ret), K(current_time),
|
||||
K(last_request_ts), K(time_delta), K(last_gts), K(gts), K(gts_delta),
|
||||
K(compensation_value));
|
||||
}
|
||||
ATOMIC_STORE(&check_gts_speed_lock_, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTimestampService::handle_request(const ObGtsRequest &request, ObGtsRpcResult &result)
|
||||
{
|
||||
static int64_t total_cnt = 0;
|
||||
@ -66,12 +133,11 @@ int ObTimestampService::handle_request(const ObGtsRequest &request, ObGtsRpcResu
|
||||
const MonotonicTs srr = request.get_srr();
|
||||
const uint64_t tenant_id = request.get_tenant_id();
|
||||
const ObAddr &requester = request.get_sender();
|
||||
int64_t end_id = 0;
|
||||
if (requester == self_) {
|
||||
// Go local call to get gts
|
||||
TRANS_LOG(DEBUG, "handle local gts request", K(requester));
|
||||
ret = handle_local_request_(request, result);
|
||||
} else if (OB_FAIL(get_number(1, ObTimeUtility::current_time_ns(), gts, end_id))) {
|
||||
} else if (OB_FAIL(get_timestamp(gts))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(10)) {
|
||||
TRANS_LOG(WARN, "get timestamp failed", KR(ret));
|
||||
}
|
||||
@ -120,8 +186,7 @@ int ObTimestampService::handle_local_request_(const ObGtsRequest &request, obrpc
|
||||
int64_t gts = 0;
|
||||
const uint64_t tenant_id = request.get_tenant_id();
|
||||
const MonotonicTs srr = request.get_srr();
|
||||
int64_t end_id = 0;
|
||||
if (OB_FAIL(get_number(1, ObTimeUtility::current_time_ns(), gts, end_id))) {
|
||||
if (OB_FAIL(get_timestamp(gts))) {
|
||||
if (EXECUTE_COUNT_PER_SEC(10)) {
|
||||
TRANS_LOG(WARN, "get timestamp failed", KR(ret));
|
||||
}
|
||||
|
||||
@ -56,6 +56,7 @@ public:
|
||||
static const int64_t TIMESTAMP_PREALLOCATED_RANGE = palf::election::MAX_LEASE_TIME * 1000;
|
||||
static const int64_t PREALLOCATE_RANGE_FOR_SWITHOVER = 2 * TIMESTAMP_PREALLOCATED_RANGE;
|
||||
int handle_request(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result);
|
||||
int get_timestamp(int64_t >s);
|
||||
int switch_to_follower_gracefully();
|
||||
void switch_to_follower_forcedly();
|
||||
int resume_leader();
|
||||
@ -66,6 +67,13 @@ public:
|
||||
void get_virtual_info(int64_t &ts_value, common::ObRole &role, int64_t &proposal_id);
|
||||
private:
|
||||
ObGtsResponseRpc rpc_;
|
||||
// last timestamp retrieved from gts leader,updated periodically, nanosecond
|
||||
int64_t last_gts_;
|
||||
// the time of last request,updated periodically, nanosecond
|
||||
int64_t last_request_ts_;
|
||||
// the lock of checking the gts service's advancing speed, used in get_timestamp to avoid
|
||||
// concurrent threads all pushing the gts ahead
|
||||
int64_t check_gts_speed_lock_;
|
||||
int handle_local_request_(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result);
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user