diff --git a/src/storage/tx/ob_gts_source.cpp b/src/storage/tx/ob_gts_source.cpp index eb586db70d..c9f86f204f 100644 --- a/src/storage/tx/ob_gts_source.cpp +++ b/src/storage/tx/ob_gts_source.cpp @@ -289,7 +289,7 @@ int ObGtsSource::get_gts_from_local_timestamp_service_(ObAddr &leader, if (OB_ISNULL(timestamp_access)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "timestamp access is null", KR(ret), KP(timestamp_access), K_(tenant_id), K(leader)); - } else if (OB_FAIL(timestamp_access->get_number(ObTimeUtility::current_time_ns(), tmp_gts))) { + } else if (OB_FAIL(timestamp_access->get_number(tmp_gts))) { if (EXECUTE_COUNT_PER_SEC(100)) { TRANS_LOG(WARN, "global_timestamp_service get gts fail", K(leader), K(tmp_gts), KR(ret)); } diff --git a/src/storage/tx/ob_timestamp_access.cpp b/src/storage/tx/ob_timestamp_access.cpp index 8feec8b36c..cd85672b3a 100644 --- a/src/storage/tx/ob_timestamp_access.cpp +++ b/src/storage/tx/ob_timestamp_access.cpp @@ -35,12 +35,11 @@ int ObTimestampAccess::handle_request(const ObGtsRequest &request, obrpc::ObGtsR return ret; } -int ObTimestampAccess::get_number(const int64_t base_id, int64_t >s) +int ObTimestampAccess::get_number(int64_t >s) { int ret = OB_SUCCESS; if (GTS_LEADER == service_type_) { - int64_t unused_id = 0; - ret = MTL(ObTimestampService *)->get_number(1, base_id, gts, unused_id); + ret = MTL(ObTimestampService *)->get_timestamp(gts); } else if (STS_LEADER == service_type_) { ret = MTL(ObStandbyTimestampService *)->get_number(gts); } else { diff --git a/src/storage/tx/ob_timestamp_access.h b/src/storage/tx/ob_timestamp_access.h index 6e65478ff0..f5582be3f2 100644 --- a/src/storage/tx/ob_timestamp_access.h +++ b/src/storage/tx/ob_timestamp_access.h @@ -48,7 +48,7 @@ public: void set_service_type(const ServiceType service_type) { service_type_ = service_type; } ServiceType get_service_type() const { return service_type_; } int handle_request(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result); - int get_number(const int64_t base_id, int64_t >s); + int get_number(int64_t >s); void get_virtual_info(int64_t &ts_value, ServiceType &service_type, common::ObRole &role, diff --git a/src/storage/tx/ob_timestamp_service.cpp b/src/storage/tx/ob_timestamp_service.cpp index 881d326f2b..2472575eae 100644 --- a/src/storage/tx/ob_timestamp_service.cpp +++ b/src/storage/tx/ob_timestamp_service.cpp @@ -33,6 +33,9 @@ int ObTimestampService::init(rpc::frame::ObReqTransport *req_transport) self_ = self; service_type_ = ServiceType::TimestampService; pre_allocated_range_ = TIMESTAMP_PREALLOCATED_RANGE; + ATOMIC_STORE(&last_gts_, 0); + ATOMIC_STORE(&last_request_ts_, 0); + ATOMIC_STORE(&check_gts_speed_lock_, 0); return rpc_.init(req_transport, self); } @@ -49,6 +52,70 @@ int ObTimestampService::mtl_init(ObTimestampService *×tamp_service) return ret; } + +// The interface for getting gts timestamp, actually a wrapper of ObIDService::get_number. +// +// For most cases, the gts service uses the machine clock's time as gts timestamp, which means +// the gts service advances as fast as the machine clock. Howerver, when the gts service switches +// leader, the new leader will pre-allocate a range of timestamps, and this can lead to the gts +// timestamp becoming larger than the machine clock's time. Then the gts service will slow down to +// wait the machine clock. But we don't want the gts service to advance too slowly(when the request +// rate is low), since the observer may wait too long before the gts timestamp crosses log SCN. +// So we periodically check the gts service's advancing speed, and if it's far slower than the +// machine clock, we manually push the gts ahead. +int ObTimestampService::get_timestamp(int64_t >s) +{ + int ret = OB_SUCCESS; + int64_t unused_id; + // 100ms + const int64_t CHECK_INTERVAL = 100000000; + const int64_t current_time = ObTimeUtility::current_time_ns(); + int64_t last_request_ts = ATOMIC_LOAD(&last_request_ts_); + int64_t time_delta = current_time - last_request_ts; + + ret = get_number(1, current_time, gts, unused_id); + + if (OB_SUCC(ret)) { + if ((last_request_ts == 0 || time_delta < 0) && ATOMIC_BCAS(&check_gts_speed_lock_, 0, 1)) { + last_request_ts = ATOMIC_LOAD(&last_request_ts_); + time_delta = current_time - last_request_ts; + // before, we only do a fast check, and we should check again after we get the lock + if (last_request_ts == 0 || time_delta < 0) { + ATOMIC_STORE(&last_request_ts_, current_time); + ATOMIC_STORE(&last_gts_, gts); + } + ATOMIC_STORE(&check_gts_speed_lock_, 0); + } else if (time_delta > CHECK_INTERVAL && ATOMIC_BCAS(&check_gts_speed_lock_, 0, 1)) { + last_request_ts = ATOMIC_LOAD(&last_request_ts_); + time_delta = current_time - last_request_ts; + // before, we only do a fast check, and we should check again after we get the lock + if (time_delta > CHECK_INTERVAL) { + const int64_t last_gts = ATOMIC_LOAD(&last_gts_); + const int64_t gts_delta = gts - last_gts; + const int64_t compensation_threshold = time_delta / 2; + const int64_t compensation_value = time_delta / 10; + // if the gts service advanced too slowly, then we add it up with `compensation_value` + if (time_delta - gts_delta > compensation_threshold) { + ret = get_number(compensation_value, current_time, gts, unused_id); + TRANS_LOG(WARN, "the gts service advanced too slowly", K(ret), K(current_time), + K(last_request_ts), K(time_delta), K(last_gts), K(gts), K(gts_delta), + K(compensation_value)); + } + if (OB_SUCC(ret)) { + ATOMIC_STORE(&last_request_ts_, current_time); + ATOMIC_STORE(&last_gts_, gts); + } + TRANS_LOG(DEBUG, "check the gts service advancing speed", K(ret), K(current_time), + K(last_request_ts), K(time_delta), K(last_gts), K(gts), K(gts_delta), + K(compensation_value)); + } + ATOMIC_STORE(&check_gts_speed_lock_, 0); + } + } + + return ret; +} + int ObTimestampService::handle_request(const ObGtsRequest &request, ObGtsRpcResult &result) { static int64_t total_cnt = 0; @@ -66,12 +133,11 @@ int ObTimestampService::handle_request(const ObGtsRequest &request, ObGtsRpcResu const MonotonicTs srr = request.get_srr(); const uint64_t tenant_id = request.get_tenant_id(); const ObAddr &requester = request.get_sender(); - int64_t end_id = 0; if (requester == self_) { // Go local call to get gts TRANS_LOG(DEBUG, "handle local gts request", K(requester)); ret = handle_local_request_(request, result); - } else if (OB_FAIL(get_number(1, ObTimeUtility::current_time_ns(), gts, end_id))) { + } else if (OB_FAIL(get_timestamp(gts))) { if (EXECUTE_COUNT_PER_SEC(10)) { TRANS_LOG(WARN, "get timestamp failed", KR(ret)); } @@ -120,8 +186,7 @@ int ObTimestampService::handle_local_request_(const ObGtsRequest &request, obrpc int64_t gts = 0; const uint64_t tenant_id = request.get_tenant_id(); const MonotonicTs srr = request.get_srr(); - int64_t end_id = 0; - if (OB_FAIL(get_number(1, ObTimeUtility::current_time_ns(), gts, end_id))) { + if (OB_FAIL(get_timestamp(gts))) { if (EXECUTE_COUNT_PER_SEC(10)) { TRANS_LOG(WARN, "get timestamp failed", KR(ret)); } @@ -213,4 +278,4 @@ void ObTimestampService::get_virtual_info(int64_t &ts_value, common::ObRole &rol } } -} +} \ No newline at end of file diff --git a/src/storage/tx/ob_timestamp_service.h b/src/storage/tx/ob_timestamp_service.h index c03d401008..4c299944e6 100644 --- a/src/storage/tx/ob_timestamp_service.h +++ b/src/storage/tx/ob_timestamp_service.h @@ -56,6 +56,7 @@ public: static const int64_t TIMESTAMP_PREALLOCATED_RANGE = palf::election::MAX_LEASE_TIME * 1000; static const int64_t PREALLOCATE_RANGE_FOR_SWITHOVER = 2 * TIMESTAMP_PREALLOCATED_RANGE; int handle_request(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result); + int get_timestamp(int64_t >s); int switch_to_follower_gracefully(); void switch_to_follower_forcedly(); int resume_leader(); @@ -66,6 +67,13 @@ public: void get_virtual_info(int64_t &ts_value, common::ObRole &role, int64_t &proposal_id); private: ObGtsResponseRpc rpc_; + // last timestamp retrieved from gts leader,updated periodically, nanosecond + int64_t last_gts_; + // the time of last request,updated periodically, nanosecond + int64_t last_request_ts_; + // the lock of checking the gts service's advancing speed, used in get_timestamp to avoid + // concurrent threads all pushing the gts ahead + int64_t check_gts_speed_lock_; int handle_local_request_(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result); };