From 15a066cf26012c9219bf1dd299653e3eb3d4088f Mon Sep 17 00:00:00 2001 From: fforkboat Date: Tue, 26 Dec 2023 09:16:21 +0000 Subject: [PATCH] [CP] opt: provide a new sync interface for getting gts timestamp --- mittest/mtlenv/mock_tenant_module_env.h | 12 + src/storage/tx/ob_trans_service_v4.cpp | 48 ++- src/storage/tx/ob_ts_mgr.cpp | 289 ++++++++++++++++++ src/storage/tx/ob_ts_mgr.h | 78 +++++ unittest/storage/tx/it/test_tx.cpp | 12 +- .../storage/tx/mock_utils/basic_fake_define.h | 24 ++ 6 files changed, 434 insertions(+), 29 deletions(-) diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index e4e71996d..a9e838c40 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -171,6 +171,18 @@ public: UNUSED(tenant_id); return source_.get_gts(stc, task, gts, receive_gts_ts); } + + virtual int get_gts_sync(const uint64_t tenant_id, + const MonotonicTs stc, + int64_t timeout_us, + share::SCN >s, + MonotonicTs &receive_gts_ts) + { + UNUSED(tenant_id); + UNUSED(timeout_us); + return source_.get_gts(stc, NULL, gts, receive_gts_ts); + } + virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN >s) { UNUSED(tenant_id); diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index b2fdf6d25..e286524f0 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1732,6 +1732,10 @@ int ObTransService::sync_acquire_global_snapshot_(ObTxDesc &tx, [&]() -> bool { return tx.flags_.INTERRUPTED_; }); tx.lock_.lock(); bool interrupted = tx.flags_.INTERRUPTED_; + if (interrupted) { + ret = OB_ERR_INTERRUPTED; + TRANS_LOG(WARN, "acquiring global snapshot has been interrupted", KR(ret), K(tx)); + } tx.clear_interrupt(); tx.flags_.BLOCK_ = false; if (op_sn != tx.op_sn_) { @@ -1759,33 +1763,27 @@ int ObTransService::acquire_global_snapshot__(const int64_t expire_ts, int ret = OB_SUCCESS; const MonotonicTs now0 = get_req_receive_mts_(); const MonotonicTs now = now0 - MonotonicTs(gts_ahead); - int retry_times = 0; - const int MAX_RETRY_TIMES = 2000; // 2000 * 500us = 1s - do { - int64_t n = ObClockGenerator::getClock(); - MonotonicTs rts(0); - if (n >= expire_ts) { - ret = OB_TIMEOUT; - } else if (retry_times++ > MAX_RETRY_TIMES) { + const int64_t current_time = ObClockGenerator::getClock(); + // occupy current worker thread for at most 1s + const int64_t MAX_WAIT_TIME_US = 1 * 1000 * 1000; + MonotonicTs rts(0); + + if (interrupt_checker()) { + ret = OB_ERR_INTERRUPTED; + } else if (current_time >= expire_ts) { + ret = OB_TIMEOUT; + TRANS_LOG(WARN, "get gts timeout", K(ret), K(expire_ts), K(current_time)); + } else if (OB_FAIL(ts_mgr_->get_gts_sync(tenant_id_, now, MAX_WAIT_TIME_US, snapshot, rts))) { + TRANS_LOG(WARN, "get gts fail", K(ret), K(expire_ts), K(now)); + if (OB_TIMEOUT == ret) { ret = OB_GTS_NOT_READY; - TRANS_LOG(WARN, "gts not ready", K(ret), K(retry_times)); - } else if (OB_FAIL(ts_mgr_->get_gts(tenant_id_, now, NULL, snapshot, rts))) { - if (OB_EAGAIN == ret) { - if (interrupt_checker()) { - ret = OB_ERR_INTERRUPTED; - } else { - ob_usleep(500); - } - } else { - TRANS_LOG(WARN, "get gts fail", K(now)); - } - } else if (OB_UNLIKELY(!snapshot.is_valid())) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now)); - } else { - uncertain_bound = rts.mts_ + gts_ahead; } - } while (OB_EAGAIN == ret); + } else if (OB_UNLIKELY(!snapshot.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now)); + } else { + uncertain_bound = rts.mts_ + gts_ahead; + } if (OB_FAIL(ret)) { TRANS_LOG(WARN, "acquire global snapshot fail", K(ret), diff --git a/src/storage/tx/ob_ts_mgr.cpp b/src/storage/tx/ob_ts_mgr.cpp index 2dddb0de7..d6f06e44e 100644 --- a/src/storage/tx/ob_ts_mgr.cpp +++ b/src/storage/tx/ob_ts_mgr.cpp @@ -107,6 +107,214 @@ ObTsSourceInfoGuard::~ObTsSourceInfoGuard() } } +int ObTsSyncGetTsCbTask::init(uint64_t task_id) +{ + int ret = OB_SUCCESS; + + if (is_inited_) { + ret = OB_INIT_TWICE; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask inited twice", KR(ret)); + } else if (OB_FAIL(cond_.init(ObWaitEventIds::SYNC_GET_GTS_WAIT))) { + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond init failed", K(ret)); + } else { + task_id_ = task_id; + is_inited_ = true; + } + + return ret; +} + +int ObTsSyncGetTsCbTask::gts_callback_interrupted(const int errcode) +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else { + ObThreadCondGuard cond_guard(cond_); + if (is_early_exit_) { + ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this); + } else { + errcode_ = errcode; + is_finished_ = true; + cond_.signal(); + } + } + + return ret; +} + +int ObTsSyncGetTsCbTask::get_gts_callback(const MonotonicTs srr, const share::SCN >s, + const MonotonicTs receive_gts_ts) +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else if (srr < get_stc()) { + ret = OB_EAGAIN; + } else { + ObThreadCondGuard cond_guard(cond_); + if (is_early_exit_) { + ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this); + } else { + gts_result_ = gts; + is_finished_ = true; + cond_.signal(); + } + } + + return ret; +} + +int ObTsSyncGetTsCbTask::gts_elapse_callback(const MonotonicTs srr, const share::SCN >s) +{ + int ret = OB_NOT_SUPPORTED; + return ret; +} + +MonotonicTs ObTsSyncGetTsCbTask::get_stc() const +{ + return stc_; +} + +uint64_t ObTsSyncGetTsCbTask::hash() const +{ + return task_id_; +} + +uint64_t ObTsSyncGetTsCbTask::get_tenant_id() const +{ + return tenant_id_; +} + +int ObTsSyncGetTsCbTask::wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task) +{ + int ret = OB_SUCCESS; + bool need_recycle = true; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else { + ObThreadCondGuard cond_guard(cond_); + // wait the condition in multiple rounds, so we can check the interrupt status every round + if (!is_finished_) { + if (OB_FAIL(cond_.wait_us(timeout_us))) { + is_early_exit_ = true; + need_recycle = false; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond wait failed", K(ret)); + } + } + if (errcode_ != OB_SUCCESS) { + ret = errcode_; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask errcode", K(ret)); + } else { + scn = gts_result_; + } + } + + need_recycle_task = need_recycle; + return ret; +} + +int ObTsSyncGetTsCbTask::config(MonotonicTs stc, uint64_t tenant_id) { + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else { + is_finished_ = false; + is_early_exit_ = false; + gts_result_.reset(); + errcode_ = OB_SUCCESS; + stc_ = stc; + tenant_id_ = tenant_id; + } + + return ret; +} + +int ObTsSyncGetTsCbTaskPool::init() { + int ret = OB_SUCCESS; + + if (is_inited_) { + ret = OB_INIT_TWICE; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool inited twice", KR(ret)); + } else { + for (uint64_t i = 0; i < POOL_SIZE; i++) { + if (OB_FAIL(tasks_[i].init(i))) { + TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret)); + break; + } + } + + if (OB_SUCC(ret)) { + is_inited_ = true; + } + } + + return ret; +} + +int ObTsSyncGetTsCbTaskPool::get_task(MonotonicTs stc, uint64_t tenant_id, + ObTsSyncGetTsCbTask *&task) { + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else { + // try to use thread_id to find a free cbtask + const int64_t thread_id = get_itid(); + int64_t index = thread_id % POOL_SIZE; + int iter_count = 0; + const int ITER_LIMIT = 8; + + while (iter_count < ITER_LIMIT) { + ObTsSyncGetTsCbTask *iter_task = &tasks_[index]; + if (ATOMIC_BCAS(&iter_task->is_occupied_, false, true)) { + break; + } + iter_count++; + index = (index + iter_count) % POOL_SIZE; + } + if (iter_count == ITER_LIMIT) { + ret = OB_EAGAIN; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool failed to get task", K(ret), K(thread_id)); + } else { + task = &tasks_[index]; + if (OB_FAIL(task->config(stc, tenant_id))) { + TRANS_LOG(WARN, "failed to config ObTsSyncGetTsCbTask", K(ret), K(index)); + } + } + } + + return ret; +} + +int ObTsSyncGetTsCbTaskPool::recycle_task(ObTsSyncGetTsCbTask *task) { + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", K(ret)); + } else if (OB_ISNULL(task)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask is NULL", KR(ret)); + } else { + if (!ATOMIC_BCAS(&task->is_occupied_, true, false)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "ObTsSyncGetTsCbTask has been recycled", KR(ret)); + } + } + + return ret; +} + ////////////////////////ObTsMgr实现/////////////////////////////////// int ObTsMgr::init(const ObAddr &server, @@ -140,6 +348,8 @@ int ObTsMgr::init(const ObAddr &server, TRANS_LOG(WARN, "response rpc init failed", KR(ret), K(server)); } else if (OB_FAIL(lock_.init(lib::ObMemAttr(OB_SERVER_TENANT_ID, "TsMgr")))) { TRANS_LOG(WARN, "ObQSyncLock init failed", KR(ret), K(OB_SERVER_TENANT_ID)); + } else if (OB_FAIL(ObTsSyncGetTsCbTaskPool::get_instance().init())) { + TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret)); } else { server_ = server; location_adapter_ = &location_adapter_def_; @@ -685,6 +895,85 @@ int ObTsMgr::get_ts_sync(const uint64_t tenant_id, const int64_t timeout_us, sha return get_ts_sync(tenant_id, timeout_us, scn, unused_is_external_consistent); } +int ObTsMgr::get_gts_sync(const uint64_t tenant_id, + const MonotonicTs stc, + const int64_t timeout_us, + share::SCN &scn, + MonotonicTs &receive_gts_ts) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "ObTsMgr is not inited", K(ret)); + } else if (OB_UNLIKELY(!is_running_)) { + ret = OB_NOT_RUNNING; + TRANS_LOG(WARN, "ObTsMgr is not running", K(ret)); + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || OB_UNLIKELY(!stc.is_valid()) + || OB_UNLIKELY(timeout_us < 0)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(stc), K(timeout_us)); + } else { + ObTsSourceInfo *ts_source_info = NULL; + ObGtsSource *ts_source = NULL; + ObTsSourceInfoGuard info_guard; + ObTsSyncGetTsCbTask *task = NULL; + int64_t gts_result = 0; + bool fall_back_to_sleep = false; + if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, true, true))) { + TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id)); + } else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "ts source info is NULL", K(ret), K(tenant_id)); + } else if (OB_ISNULL(ts_source = ts_source_info->get_gts_source())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "ts source is NULL", K(ret)); + } else if (OB_SUCC(ObTsSyncGetTsCbTaskPool::get_instance().get_task(stc, tenant_id, task))) { + bool need_recycle_task = true; + if (OB_FAIL(ts_source->get_gts(stc, task, gts_result, receive_gts_ts))) { + if (OB_EAGAIN != ret) { + TRANS_LOG(WARN, "get gts error", K(ret), K(tenant_id), K(stc)); + } else if (OB_FAIL(task->wait(timeout_us, scn, need_recycle_task))) { + if (OB_TIMEOUT != ret) { + fall_back_to_sleep = true; + } + TRANS_LOG(WARN, "failed to wait ObTsSyncGetTsCbTask", K(ret), K(tenant_id), K(timeout_us)); + } + } else { + scn.convert_for_gts(gts_result); + } + if (need_recycle_task) { + ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(task); + } + } else { + fall_back_to_sleep = true; + } + if (fall_back_to_sleep) { + TRANS_LOG(WARN, "failed to get ObTsSyncGetTsCbTask, fall back to sleep", K(ret)); + + int64_t expire_ts = ObClockGenerator::getClock() + timeout_us; + int retry_times = 0; + const int64_t SLEEP_TIME_US = 500; + do { + const int64_t now = ObClockGenerator::getClock(); + if (now >= expire_ts) { + ret = OB_TIMEOUT; + } else if (OB_FAIL(ts_source->get_gts(stc, NULL, gts_result, receive_gts_ts))) { + if (OB_EAGAIN == ret) { + ob_usleep(SLEEP_TIME_US); + } else { + TRANS_LOG(WARN, "get gts fail", K(ret), K(now)); + } + } else { + scn.convert_for_gts(gts_result); + } + } while (OB_EAGAIN == ret); + } + } + + return ret; +} + int ObTsMgr::get_ts_sync(const uint64_t tenant_id, const int64_t timeout_us, SCN &scn, diff --git a/src/storage/tx/ob_ts_mgr.h b/src/storage/tx/ob_ts_mgr.h index 6a1247d36..2e2e437a6 100644 --- a/src/storage/tx/ob_ts_mgr.h +++ b/src/storage/tx/ob_ts_mgr.h @@ -86,6 +86,12 @@ public: ObTsCbTask *task, share::SCN &scn, MonotonicTs &receive_gts_ts) = 0; + virtual int get_gts_sync(const uint64_t tenant_id, + const MonotonicTs stc, + const int64_t timeout_us, + share::SCN &scn, + MonotonicTs &receive_gts_ts) = 0; + virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) = 0; virtual int get_ts_sync(const uint64_t tenant_id, const int64_t timeout_ts, share::SCN &scn, bool &is_external_consistent) = 0; @@ -285,6 +291,64 @@ private: bool need_revert_; }; +class ObTsSyncGetTsCbTask : public ObTsCbTask +{ +public: + friend class ObTsSyncGetTsCbTaskPool; + ObTsSyncGetTsCbTask() + :is_inited_(false), task_id_(0), is_occupied_(false), is_finished_(false), + is_early_exit_(false), stc_(0), tenant_id_(0), errcode_(OB_SUCCESS) {} + ~ObTsSyncGetTsCbTask() {} + int init(uint64_t task_id); + int config(MonotonicTs stc, uint64_t tenant_id); + int gts_callback_interrupted(const int errcode) override; + int get_gts_callback(const MonotonicTs srr, const share::SCN >s, + const MonotonicTs receive_gts_ts) override; + int gts_elapse_callback(const MonotonicTs srr, const share::SCN >s) override; + MonotonicTs get_stc() const override; + uint64_t hash() const override; + uint64_t get_tenant_id() const override; + int wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task); +private: + bool is_inited_; + uint64_t task_id_; + // whether this callback task is being used + bool is_occupied_ __attribute__((aligned(8))); + // whether the callback has been invoked + bool is_finished_; + // whether the caller exits (due to timeout) before the callback is invoked + bool is_early_exit_; + share::SCN gts_result_; + ObThreadCond cond_; + MonotonicTs stc_; + uint64_t tenant_id_; + int errcode_; +}; + +STATIC_ASSERT(sizeof(ObTsSyncGetTsCbTask) <= 256, "ObTsSyncGetTsCbTask is too large"); +/** + * The resource pool of ObTsSyncGetTsCbTask. The pool has a fixed size of cbtasks, and the cbtasks + * can be reused. + */ +class ObTsSyncGetTsCbTaskPool +{ +public: + static constexpr int64_t POOL_SIZE = 8000; + ObTsSyncGetTsCbTaskPool() {} + ~ObTsSyncGetTsCbTaskPool() {} + static ObTsSyncGetTsCbTaskPool& get_instance() + { + static ObTsSyncGetTsCbTaskPool pool; + return pool; + } + int init(); + int get_task(MonotonicTs stc, uint64_t tenant_id, ObTsSyncGetTsCbTask *&task); + int recycle_task(ObTsSyncGetTsCbTask *task); +private: + bool is_inited_; + ObTsSyncGetTsCbTask tasks_[POOL_SIZE]; +}; + typedef common::ObLinkHashMap ObTsSourceInfoMap; class ObTsMgr : public share::ObThreadPool, public ObITsMgr { @@ -315,6 +379,20 @@ public: ObTsCbTask *task, share::SCN &scn, MonotonicTs &receive_gts_ts); + /** + * 与`get_gts`相对应的同步接口,用于同步获取合适的GTS时间戳,可传入超时时间以避免长时间等待。 + * 相较于原有同步接口`get_ts_sync`,本接口的性能更好 + * @param[in] tenant_id + * @param[in] stc: 需要获取GTS的时间点,一般取current time + * @param[in] timeout_us: 超时时长,单位us + * @param[out] scn: 获取到的GTS时间戳结果 + * @param[out] receive_gts_ts: 收到GTS response的时间点 + */ + int get_gts_sync(const uint64_t tenant_id, + const MonotonicTs stc, + const int64_t timeout_us, + share::SCN &scn, + MonotonicTs &receive_gts_ts); //仅仅获取本地gts cache的最新值,但可能会失败,失败之后处理逻辑如下: //1. 如果task == NULL,说明调用者不需要异步回调,直接返回报错,由调用者处理 //2. 如果task != NULL,需要注册异步回调任务 diff --git a/unittest/storage/tx/it/test_tx.cpp b/unittest/storage/tx/it/test_tx.cpp index 87d09c37f..f00c901a3 100644 --- a/unittest/storage/tx/it/test_tx.cpp +++ b/unittest/storage/tx/it/test_tx.cpp @@ -2405,10 +2405,14 @@ TEST_F(ObTestTx, interrupt_get_read_snapshot) PREPARE_TX(n1, tx); ObTxReadSnapshot snapshot; n1->get_ts_mgr_().inject_get_gts_error(OB_EAGAIN); - ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot)); - ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED)); - ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret); - ASSERT_EQ(OB_ERR_INTERRUPTED, wait_ret); + int ret = OB_SUCCESS; + do { + ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot)); + ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED)); + ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret); + ret = wait_ret; + } while (OB_GTS_NOT_READY == ret); + ASSERT_EQ(OB_ERR_INTERRUPTED, ret); ROLLBACK_TX(n1, tx); } diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index 51097a372..8e5b6e5ec 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -271,6 +271,30 @@ public: return ret; } + int get_gts_sync(const uint64_t tenant_id, + const MonotonicTs stc, + const int64_t timeout_us, + share::SCN &scn, + MonotonicTs &receive_gts_ts) + { + int ret = OB_SUCCESS; + const int64_t expire_ts = ObClockGenerator::getClock() + timeout_us; + + do { + int64_t n = ObClockGenerator::getClock(); + if (n >= expire_ts) { + ret = OB_TIMEOUT; + } else if (OB_FAIL(get_gts(tenant_id, stc, NULL, scn, receive_gts_ts))) { + if (OB_EAGAIN == ret) { + ob_usleep(500); + } + } + } while (OB_EAGAIN == ret); + + return ret; + return get_gts(tenant_id, stc, NULL, scn, receive_gts_ts); + } + int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) { if (get_gts_error_) { return get_gts_error_; } return OB_SUCCESS;