[CP] opt: provide a new sync interface for getting gts timestamp

This commit is contained in:
fforkboat 2023-12-26 09:16:21 +00:00 committed by ob-robot
parent b1f8e690ea
commit 15a066cf26
6 changed files with 434 additions and 29 deletions

View File

@ -171,6 +171,18 @@ public:
UNUSED(tenant_id);
return source_.get_gts(stc, task, gts, receive_gts_ts);
}
virtual int get_gts_sync(const uint64_t tenant_id,
const MonotonicTs stc,
int64_t timeout_us,
share::SCN &gts,
MonotonicTs &receive_gts_ts)
{
UNUSED(tenant_id);
UNUSED(timeout_us);
return source_.get_gts(stc, NULL, gts, receive_gts_ts);
}
virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &gts)
{
UNUSED(tenant_id);

View File

@ -1732,6 +1732,10 @@ int ObTransService::sync_acquire_global_snapshot_(ObTxDesc &tx,
[&]() -> bool { return tx.flags_.INTERRUPTED_; });
tx.lock_.lock();
bool interrupted = tx.flags_.INTERRUPTED_;
if (interrupted) {
ret = OB_ERR_INTERRUPTED;
TRANS_LOG(WARN, "acquiring global snapshot has been interrupted", KR(ret), K(tx));
}
tx.clear_interrupt();
tx.flags_.BLOCK_ = false;
if (op_sn != tx.op_sn_) {
@ -1759,33 +1763,27 @@ int ObTransService::acquire_global_snapshot__(const int64_t expire_ts,
int ret = OB_SUCCESS;
const MonotonicTs now0 = get_req_receive_mts_();
const MonotonicTs now = now0 - MonotonicTs(gts_ahead);
int retry_times = 0;
const int MAX_RETRY_TIMES = 2000; // 2000 * 500us = 1s
do {
int64_t n = ObClockGenerator::getClock();
MonotonicTs rts(0);
if (n >= expire_ts) {
ret = OB_TIMEOUT;
} else if (retry_times++ > MAX_RETRY_TIMES) {
const int64_t current_time = ObClockGenerator::getClock();
// occupy current worker thread for at most 1s
const int64_t MAX_WAIT_TIME_US = 1 * 1000 * 1000;
MonotonicTs rts(0);
if (interrupt_checker()) {
ret = OB_ERR_INTERRUPTED;
} else if (current_time >= expire_ts) {
ret = OB_TIMEOUT;
TRANS_LOG(WARN, "get gts timeout", K(ret), K(expire_ts), K(current_time));
} else if (OB_FAIL(ts_mgr_->get_gts_sync(tenant_id_, now, MAX_WAIT_TIME_US, snapshot, rts))) {
TRANS_LOG(WARN, "get gts fail", K(ret), K(expire_ts), K(now));
if (OB_TIMEOUT == ret) {
ret = OB_GTS_NOT_READY;
TRANS_LOG(WARN, "gts not ready", K(ret), K(retry_times));
} else if (OB_FAIL(ts_mgr_->get_gts(tenant_id_, now, NULL, snapshot, rts))) {
if (OB_EAGAIN == ret) {
if (interrupt_checker()) {
ret = OB_ERR_INTERRUPTED;
} else {
ob_usleep(500);
}
} else {
TRANS_LOG(WARN, "get gts fail", K(now));
}
} else if (OB_UNLIKELY(!snapshot.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now));
} else {
uncertain_bound = rts.mts_ + gts_ahead;
}
} while (OB_EAGAIN == ret);
} else if (OB_UNLIKELY(!snapshot.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now));
} else {
uncertain_bound = rts.mts_ + gts_ahead;
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "acquire global snapshot fail", K(ret),

View File

@ -107,6 +107,214 @@ ObTsSourceInfoGuard::~ObTsSourceInfoGuard()
}
}
int ObTsSyncGetTsCbTask::init(uint64_t task_id)
{
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask inited twice", KR(ret));
} else if (OB_FAIL(cond_.init(ObWaitEventIds::SYNC_GET_GTS_WAIT))) {
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond init failed", K(ret));
} else {
task_id_ = task_id;
is_inited_ = true;
}
return ret;
}
int ObTsSyncGetTsCbTask::gts_callback_interrupted(const int errcode)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else {
ObThreadCondGuard cond_guard(cond_);
if (is_early_exit_) {
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this);
} else {
errcode_ = errcode;
is_finished_ = true;
cond_.signal();
}
}
return ret;
}
int ObTsSyncGetTsCbTask::get_gts_callback(const MonotonicTs srr, const share::SCN &gts,
const MonotonicTs receive_gts_ts)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else if (srr < get_stc()) {
ret = OB_EAGAIN;
} else {
ObThreadCondGuard cond_guard(cond_);
if (is_early_exit_) {
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this);
} else {
gts_result_ = gts;
is_finished_ = true;
cond_.signal();
}
}
return ret;
}
int ObTsSyncGetTsCbTask::gts_elapse_callback(const MonotonicTs srr, const share::SCN &gts)
{
int ret = OB_NOT_SUPPORTED;
return ret;
}
MonotonicTs ObTsSyncGetTsCbTask::get_stc() const
{
return stc_;
}
uint64_t ObTsSyncGetTsCbTask::hash() const
{
return task_id_;
}
uint64_t ObTsSyncGetTsCbTask::get_tenant_id() const
{
return tenant_id_;
}
int ObTsSyncGetTsCbTask::wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task)
{
int ret = OB_SUCCESS;
bool need_recycle = true;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else {
ObThreadCondGuard cond_guard(cond_);
// wait the condition in multiple rounds, so we can check the interrupt status every round
if (!is_finished_) {
if (OB_FAIL(cond_.wait_us(timeout_us))) {
is_early_exit_ = true;
need_recycle = false;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond wait failed", K(ret));
}
}
if (errcode_ != OB_SUCCESS) {
ret = errcode_;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask errcode", K(ret));
} else {
scn = gts_result_;
}
}
need_recycle_task = need_recycle;
return ret;
}
int ObTsSyncGetTsCbTask::config(MonotonicTs stc, uint64_t tenant_id) {
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else {
is_finished_ = false;
is_early_exit_ = false;
gts_result_.reset();
errcode_ = OB_SUCCESS;
stc_ = stc;
tenant_id_ = tenant_id;
}
return ret;
}
int ObTsSyncGetTsCbTaskPool::init() {
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool inited twice", KR(ret));
} else {
for (uint64_t i = 0; i < POOL_SIZE; i++) {
if (OB_FAIL(tasks_[i].init(i))) {
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret));
break;
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
}
return ret;
}
int ObTsSyncGetTsCbTaskPool::get_task(MonotonicTs stc, uint64_t tenant_id,
ObTsSyncGetTsCbTask *&task) {
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else {
// try to use thread_id to find a free cbtask
const int64_t thread_id = get_itid();
int64_t index = thread_id % POOL_SIZE;
int iter_count = 0;
const int ITER_LIMIT = 8;
while (iter_count < ITER_LIMIT) {
ObTsSyncGetTsCbTask *iter_task = &tasks_[index];
if (ATOMIC_BCAS(&iter_task->is_occupied_, false, true)) {
break;
}
iter_count++;
index = (index + iter_count) % POOL_SIZE;
}
if (iter_count == ITER_LIMIT) {
ret = OB_EAGAIN;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool failed to get task", K(ret), K(thread_id));
} else {
task = &tasks_[index];
if (OB_FAIL(task->config(stc, tenant_id))) {
TRANS_LOG(WARN, "failed to config ObTsSyncGetTsCbTask", K(ret), K(index));
}
}
}
return ret;
}
int ObTsSyncGetTsCbTaskPool::recycle_task(ObTsSyncGetTsCbTask *task) {
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else if (OB_ISNULL(task)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask is NULL", KR(ret));
} else {
if (!ATOMIC_BCAS(&task->is_occupied_, true, false)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask has been recycled", KR(ret));
}
}
return ret;
}
////////////////////////ObTsMgr实现///////////////////////////////////
int ObTsMgr::init(const ObAddr &server,
@ -140,6 +348,8 @@ int ObTsMgr::init(const ObAddr &server,
TRANS_LOG(WARN, "response rpc init failed", KR(ret), K(server));
} else if (OB_FAIL(lock_.init(lib::ObMemAttr(OB_SERVER_TENANT_ID, "TsMgr")))) {
TRANS_LOG(WARN, "ObQSyncLock init failed", KR(ret), K(OB_SERVER_TENANT_ID));
} else if (OB_FAIL(ObTsSyncGetTsCbTaskPool::get_instance().init())) {
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret));
} else {
server_ = server;
location_adapter_ = &location_adapter_def_;
@ -685,6 +895,85 @@ int ObTsMgr::get_ts_sync(const uint64_t tenant_id, const int64_t timeout_us, sha
return get_ts_sync(tenant_id, timeout_us, scn, unused_is_external_consistent);
}
int ObTsMgr::get_gts_sync(const uint64_t tenant_id,
const MonotonicTs stc,
const int64_t timeout_us,
share::SCN &scn,
MonotonicTs &receive_gts_ts)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObTsMgr is not inited", K(ret));
} else if (OB_UNLIKELY(!is_running_)) {
ret = OB_NOT_RUNNING;
TRANS_LOG(WARN, "ObTsMgr is not running", K(ret));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || OB_UNLIKELY(!stc.is_valid())
|| OB_UNLIKELY(timeout_us < 0)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(stc), K(timeout_us));
} else {
ObTsSourceInfo *ts_source_info = NULL;
ObGtsSource *ts_source = NULL;
ObTsSourceInfoGuard info_guard;
ObTsSyncGetTsCbTask *task = NULL;
int64_t gts_result = 0;
bool fall_back_to_sleep = false;
if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, true, true))) {
TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id));
} else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ts source info is NULL", K(ret), K(tenant_id));
} else if (OB_ISNULL(ts_source = ts_source_info->get_gts_source())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ts source is NULL", K(ret));
} else if (OB_SUCC(ObTsSyncGetTsCbTaskPool::get_instance().get_task(stc, tenant_id, task))) {
bool need_recycle_task = true;
if (OB_FAIL(ts_source->get_gts(stc, task, gts_result, receive_gts_ts))) {
if (OB_EAGAIN != ret) {
TRANS_LOG(WARN, "get gts error", K(ret), K(tenant_id), K(stc));
} else if (OB_FAIL(task->wait(timeout_us, scn, need_recycle_task))) {
if (OB_TIMEOUT != ret) {
fall_back_to_sleep = true;
}
TRANS_LOG(WARN, "failed to wait ObTsSyncGetTsCbTask", K(ret), K(tenant_id), K(timeout_us));
}
} else {
scn.convert_for_gts(gts_result);
}
if (need_recycle_task) {
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(task);
}
} else {
fall_back_to_sleep = true;
}
if (fall_back_to_sleep) {
TRANS_LOG(WARN, "failed to get ObTsSyncGetTsCbTask, fall back to sleep", K(ret));
int64_t expire_ts = ObClockGenerator::getClock() + timeout_us;
int retry_times = 0;
const int64_t SLEEP_TIME_US = 500;
do {
const int64_t now = ObClockGenerator::getClock();
if (now >= expire_ts) {
ret = OB_TIMEOUT;
} else if (OB_FAIL(ts_source->get_gts(stc, NULL, gts_result, receive_gts_ts))) {
if (OB_EAGAIN == ret) {
ob_usleep(SLEEP_TIME_US);
} else {
TRANS_LOG(WARN, "get gts fail", K(ret), K(now));
}
} else {
scn.convert_for_gts(gts_result);
}
} while (OB_EAGAIN == ret);
}
}
return ret;
}
int ObTsMgr::get_ts_sync(const uint64_t tenant_id,
const int64_t timeout_us,
SCN &scn,

View File

@ -86,6 +86,12 @@ public:
ObTsCbTask *task,
share::SCN &scn,
MonotonicTs &receive_gts_ts) = 0;
virtual int get_gts_sync(const uint64_t tenant_id,
const MonotonicTs stc,
const int64_t timeout_us,
share::SCN &scn,
MonotonicTs &receive_gts_ts) = 0;
virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) = 0;
virtual int get_ts_sync(const uint64_t tenant_id, const int64_t timeout_ts,
share::SCN &scn, bool &is_external_consistent) = 0;
@ -285,6 +291,64 @@ private:
bool need_revert_;
};
class ObTsSyncGetTsCbTask : public ObTsCbTask
{
public:
friend class ObTsSyncGetTsCbTaskPool;
ObTsSyncGetTsCbTask()
:is_inited_(false), task_id_(0), is_occupied_(false), is_finished_(false),
is_early_exit_(false), stc_(0), tenant_id_(0), errcode_(OB_SUCCESS) {}
~ObTsSyncGetTsCbTask() {}
int init(uint64_t task_id);
int config(MonotonicTs stc, uint64_t tenant_id);
int gts_callback_interrupted(const int errcode) override;
int get_gts_callback(const MonotonicTs srr, const share::SCN &gts,
const MonotonicTs receive_gts_ts) override;
int gts_elapse_callback(const MonotonicTs srr, const share::SCN &gts) override;
MonotonicTs get_stc() const override;
uint64_t hash() const override;
uint64_t get_tenant_id() const override;
int wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task);
private:
bool is_inited_;
uint64_t task_id_;
// whether this callback task is being used
bool is_occupied_ __attribute__((aligned(8)));
// whether the callback has been invoked
bool is_finished_;
// whether the caller exits (due to timeout) before the callback is invoked
bool is_early_exit_;
share::SCN gts_result_;
ObThreadCond cond_;
MonotonicTs stc_;
uint64_t tenant_id_;
int errcode_;
};
STATIC_ASSERT(sizeof(ObTsSyncGetTsCbTask) <= 256, "ObTsSyncGetTsCbTask is too large");
/**
* The resource pool of ObTsSyncGetTsCbTask. The pool has a fixed size of cbtasks, and the cbtasks
* can be reused.
*/
class ObTsSyncGetTsCbTaskPool
{
public:
static constexpr int64_t POOL_SIZE = 8000;
ObTsSyncGetTsCbTaskPool() {}
~ObTsSyncGetTsCbTaskPool() {}
static ObTsSyncGetTsCbTaskPool& get_instance()
{
static ObTsSyncGetTsCbTaskPool pool;
return pool;
}
int init();
int get_task(MonotonicTs stc, uint64_t tenant_id, ObTsSyncGetTsCbTask *&task);
int recycle_task(ObTsSyncGetTsCbTask *task);
private:
bool is_inited_;
ObTsSyncGetTsCbTask tasks_[POOL_SIZE];
};
typedef common::ObLinkHashMap<ObTsTenantInfo, ObTsSourceInfo, ObTsSourceInfoAlloc> ObTsSourceInfoMap;
class ObTsMgr : public share::ObThreadPool, public ObITsMgr
{
@ -315,6 +379,20 @@ public:
ObTsCbTask *task,
share::SCN &scn,
MonotonicTs &receive_gts_ts);
/**
* `get_gts`GTS时间戳
* `get_ts_sync`
* @param[in] tenant_id
* @param[in] stc: GTS的时间点current time
* @param[in] timeout_us: us
* @param[out] scn: GTS时间戳结果
* @param[out] receive_gts_ts: GTS response的时间点
*/
int get_gts_sync(const uint64_t tenant_id,
const MonotonicTs stc,
const int64_t timeout_us,
share::SCN &scn,
MonotonicTs &receive_gts_ts);
//仅仅获取本地gts cache的最新值,但可能会失败,失败之后处理逻辑如下:
//1. 如果task == NULL,说明调用者不需要异步回调,直接返回报错,由调用者处理
//2. 如果task != NULL,需要注册异步回调任务

View File

@ -2405,10 +2405,14 @@ TEST_F(ObTestTx, interrupt_get_read_snapshot)
PREPARE_TX(n1, tx);
ObTxReadSnapshot snapshot;
n1->get_ts_mgr_().inject_get_gts_error(OB_EAGAIN);
ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot));
ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED));
ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret);
ASSERT_EQ(OB_ERR_INTERRUPTED, wait_ret);
int ret = OB_SUCCESS;
do {
ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot));
ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED));
ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret);
ret = wait_ret;
} while (OB_GTS_NOT_READY == ret);
ASSERT_EQ(OB_ERR_INTERRUPTED, ret);
ROLLBACK_TX(n1, tx);
}

View File

@ -271,6 +271,30 @@ public:
return ret;
}
int get_gts_sync(const uint64_t tenant_id,
const MonotonicTs stc,
const int64_t timeout_us,
share::SCN &scn,
MonotonicTs &receive_gts_ts)
{
int ret = OB_SUCCESS;
const int64_t expire_ts = ObClockGenerator::getClock() + timeout_us;
do {
int64_t n = ObClockGenerator::getClock();
if (n >= expire_ts) {
ret = OB_TIMEOUT;
} else if (OB_FAIL(get_gts(tenant_id, stc, NULL, scn, receive_gts_ts))) {
if (OB_EAGAIN == ret) {
ob_usleep(500);
}
}
} while (OB_EAGAIN == ret);
return ret;
return get_gts(tenant_id, stc, NULL, scn, receive_gts_ts);
}
int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) {
if (get_gts_error_) { return get_gts_error_; }
return OB_SUCCESS;