[CP] opt: provide a new sync interface for getting gts timestamp
This commit is contained in:
parent
7775df4c75
commit
31e5ff5f59
@ -171,6 +171,18 @@ public:
|
||||
UNUSED(tenant_id);
|
||||
return source_.get_gts(stc, task, gts, receive_gts_ts);
|
||||
}
|
||||
|
||||
virtual int get_gts_sync(const uint64_t tenant_id,
|
||||
const MonotonicTs stc,
|
||||
int64_t timeout_us,
|
||||
share::SCN >s,
|
||||
MonotonicTs &receive_gts_ts)
|
||||
{
|
||||
UNUSED(tenant_id);
|
||||
UNUSED(timeout_us);
|
||||
return source_.get_gts(stc, NULL, gts, receive_gts_ts);
|
||||
}
|
||||
|
||||
virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN >s)
|
||||
{
|
||||
UNUSED(tenant_id);
|
||||
|
@ -1732,6 +1732,10 @@ int ObTransService::sync_acquire_global_snapshot_(ObTxDesc &tx,
|
||||
[&]() -> bool { return tx.flags_.INTERRUPTED_; });
|
||||
tx.lock_.lock();
|
||||
bool interrupted = tx.flags_.INTERRUPTED_;
|
||||
if (interrupted) {
|
||||
ret = OB_ERR_INTERRUPTED;
|
||||
TRANS_LOG(WARN, "acquiring global snapshot has been interrupted", KR(ret), K(tx));
|
||||
}
|
||||
tx.clear_interrupt();
|
||||
tx.flags_.BLOCK_ = false;
|
||||
if (op_sn != tx.op_sn_) {
|
||||
@ -1759,33 +1763,27 @@ int ObTransService::acquire_global_snapshot__(const int64_t expire_ts,
|
||||
int ret = OB_SUCCESS;
|
||||
const MonotonicTs now0 = get_req_receive_mts_();
|
||||
const MonotonicTs now = now0 - MonotonicTs(gts_ahead);
|
||||
int retry_times = 0;
|
||||
const int MAX_RETRY_TIMES = 2000; // 2000 * 500us = 1s
|
||||
do {
|
||||
int64_t n = ObClockGenerator::getClock();
|
||||
MonotonicTs rts(0);
|
||||
if (n >= expire_ts) {
|
||||
ret = OB_TIMEOUT;
|
||||
} else if (retry_times++ > MAX_RETRY_TIMES) {
|
||||
const int64_t current_time = ObClockGenerator::getClock();
|
||||
// occupy current worker thread for at most 1s
|
||||
const int64_t MAX_WAIT_TIME_US = 1 * 1000 * 1000;
|
||||
MonotonicTs rts(0);
|
||||
|
||||
if (interrupt_checker()) {
|
||||
ret = OB_ERR_INTERRUPTED;
|
||||
} else if (current_time >= expire_ts) {
|
||||
ret = OB_TIMEOUT;
|
||||
TRANS_LOG(WARN, "get gts timeout", K(ret), K(expire_ts), K(current_time));
|
||||
} else if (OB_FAIL(ts_mgr_->get_gts_sync(tenant_id_, now, MAX_WAIT_TIME_US, snapshot, rts))) {
|
||||
TRANS_LOG(WARN, "get gts fail", K(ret), K(expire_ts), K(now));
|
||||
if (OB_TIMEOUT == ret) {
|
||||
ret = OB_GTS_NOT_READY;
|
||||
TRANS_LOG(WARN, "gts not ready", K(ret), K(retry_times));
|
||||
} else if (OB_FAIL(ts_mgr_->get_gts(tenant_id_, now, NULL, snapshot, rts))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
if (interrupt_checker()) {
|
||||
ret = OB_ERR_INTERRUPTED;
|
||||
} else {
|
||||
ob_usleep(500);
|
||||
}
|
||||
} else {
|
||||
TRANS_LOG(WARN, "get gts fail", K(now));
|
||||
}
|
||||
} else if (OB_UNLIKELY(!snapshot.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now));
|
||||
} else {
|
||||
uncertain_bound = rts.mts_ + gts_ahead;
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
} else if (OB_UNLIKELY(!snapshot.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "invalid snapshot from gts", K(snapshot), K(now));
|
||||
} else {
|
||||
uncertain_bound = rts.mts_ + gts_ahead;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "acquire global snapshot fail", K(ret),
|
||||
|
@ -107,6 +107,214 @@ ObTsSourceInfoGuard::~ObTsSourceInfoGuard()
|
||||
}
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::init(uint64_t task_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask inited twice", KR(ret));
|
||||
} else if (OB_FAIL(cond_.init(ObWaitEventIds::SYNC_GET_GTS_WAIT))) {
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond init failed", K(ret));
|
||||
} else {
|
||||
task_id_ = task_id;
|
||||
is_inited_ = true;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::gts_callback_interrupted(const int errcode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else {
|
||||
ObThreadCondGuard cond_guard(cond_);
|
||||
if (is_early_exit_) {
|
||||
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this);
|
||||
} else {
|
||||
errcode_ = errcode;
|
||||
is_finished_ = true;
|
||||
cond_.signal();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::get_gts_callback(const MonotonicTs srr, const share::SCN >s,
|
||||
const MonotonicTs receive_gts_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else if (srr < get_stc()) {
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
ObThreadCondGuard cond_guard(cond_);
|
||||
if (is_early_exit_) {
|
||||
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(this);
|
||||
} else {
|
||||
gts_result_ = gts;
|
||||
is_finished_ = true;
|
||||
cond_.signal();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::gts_elapse_callback(const MonotonicTs srr, const share::SCN >s)
|
||||
{
|
||||
int ret = OB_NOT_SUPPORTED;
|
||||
return ret;
|
||||
}
|
||||
|
||||
MonotonicTs ObTsSyncGetTsCbTask::get_stc() const
|
||||
{
|
||||
return stc_;
|
||||
}
|
||||
|
||||
uint64_t ObTsSyncGetTsCbTask::hash() const
|
||||
{
|
||||
return task_id_;
|
||||
}
|
||||
|
||||
uint64_t ObTsSyncGetTsCbTask::get_tenant_id() const
|
||||
{
|
||||
return tenant_id_;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_recycle = true;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else {
|
||||
ObThreadCondGuard cond_guard(cond_);
|
||||
// wait the condition in multiple rounds, so we can check the interrupt status every round
|
||||
if (!is_finished_) {
|
||||
if (OB_FAIL(cond_.wait_us(timeout_us))) {
|
||||
is_early_exit_ = true;
|
||||
need_recycle = false;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask cond wait failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (errcode_ != OB_SUCCESS) {
|
||||
ret = errcode_;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask errcode", K(ret));
|
||||
} else {
|
||||
scn = gts_result_;
|
||||
}
|
||||
}
|
||||
|
||||
need_recycle_task = need_recycle;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTask::config(MonotonicTs stc, uint64_t tenant_id) {
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else {
|
||||
is_finished_ = false;
|
||||
is_early_exit_ = false;
|
||||
gts_result_.reset();
|
||||
errcode_ = OB_SUCCESS;
|
||||
stc_ = stc;
|
||||
tenant_id_ = tenant_id;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTaskPool::init() {
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool inited twice", KR(ret));
|
||||
} else {
|
||||
for (uint64_t i = 0; i < POOL_SIZE; i++) {
|
||||
if (OB_FAIL(tasks_[i].init(i))) {
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTaskPool::get_task(MonotonicTs stc, uint64_t tenant_id,
|
||||
ObTsSyncGetTsCbTask *&task) {
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else {
|
||||
// try to use thread_id to find a free cbtask
|
||||
const int64_t thread_id = get_itid();
|
||||
int64_t index = thread_id % POOL_SIZE;
|
||||
int iter_count = 0;
|
||||
const int ITER_LIMIT = 8;
|
||||
|
||||
while (iter_count < ITER_LIMIT) {
|
||||
ObTsSyncGetTsCbTask *iter_task = &tasks_[index];
|
||||
if (ATOMIC_BCAS(&iter_task->is_occupied_, false, true)) {
|
||||
break;
|
||||
}
|
||||
iter_count++;
|
||||
index = (index + iter_count) % POOL_SIZE;
|
||||
}
|
||||
if (iter_count == ITER_LIMIT) {
|
||||
ret = OB_EAGAIN;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool failed to get task", K(ret), K(thread_id));
|
||||
} else {
|
||||
task = &tasks_[index];
|
||||
if (OB_FAIL(task->config(stc, tenant_id))) {
|
||||
TRANS_LOG(WARN, "failed to config ObTsSyncGetTsCbTask", K(ret), K(index));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsSyncGetTsCbTaskPool::recycle_task(ObTsSyncGetTsCbTask *task) {
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret));
|
||||
} else if (OB_ISNULL(task)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask is NULL", KR(ret));
|
||||
} else {
|
||||
if (!ATOMIC_BCAS(&task->is_occupied_, true, false)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTask has been recycled", KR(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
////////////////////////ObTsMgr实现///////////////////////////////////
|
||||
|
||||
int ObTsMgr::init(const ObAddr &server,
|
||||
@ -140,6 +348,8 @@ int ObTsMgr::init(const ObAddr &server,
|
||||
TRANS_LOG(WARN, "response rpc init failed", KR(ret), K(server));
|
||||
} else if (OB_FAIL(lock_.init(lib::ObMemAttr(OB_SERVER_TENANT_ID, "TsMgr")))) {
|
||||
TRANS_LOG(WARN, "ObQSyncLock init failed", KR(ret), K(OB_SERVER_TENANT_ID));
|
||||
} else if (OB_FAIL(ObTsSyncGetTsCbTaskPool::get_instance().init())) {
|
||||
TRANS_LOG(WARN, "ObTsSyncGetTsCbTaskPool init failed", KR(ret));
|
||||
} else {
|
||||
server_ = server;
|
||||
location_adapter_ = &location_adapter_def_;
|
||||
@ -685,6 +895,85 @@ int ObTsMgr::get_ts_sync(const uint64_t tenant_id, const int64_t timeout_us, sha
|
||||
return get_ts_sync(tenant_id, timeout_us, scn, unused_is_external_consistent);
|
||||
}
|
||||
|
||||
int ObTsMgr::get_gts_sync(const uint64_t tenant_id,
|
||||
const MonotonicTs stc,
|
||||
const int64_t timeout_us,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "ObTsMgr is not inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!is_running_)) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
TRANS_LOG(WARN, "ObTsMgr is not running", K(ret));
|
||||
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) || OB_UNLIKELY(!stc.is_valid())
|
||||
|| OB_UNLIKELY(timeout_us < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(stc), K(timeout_us));
|
||||
} else {
|
||||
ObTsSourceInfo *ts_source_info = NULL;
|
||||
ObGtsSource *ts_source = NULL;
|
||||
ObTsSourceInfoGuard info_guard;
|
||||
ObTsSyncGetTsCbTask *task = NULL;
|
||||
int64_t gts_result = 0;
|
||||
bool fall_back_to_sleep = false;
|
||||
if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, true, true))) {
|
||||
TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "ts source info is NULL", K(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(ts_source = ts_source_info->get_gts_source())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "ts source is NULL", K(ret));
|
||||
} else if (OB_SUCC(ObTsSyncGetTsCbTaskPool::get_instance().get_task(stc, tenant_id, task))) {
|
||||
bool need_recycle_task = true;
|
||||
if (OB_FAIL(ts_source->get_gts(stc, task, gts_result, receive_gts_ts))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
TRANS_LOG(WARN, "get gts error", K(ret), K(tenant_id), K(stc));
|
||||
} else if (OB_FAIL(task->wait(timeout_us, scn, need_recycle_task))) {
|
||||
if (OB_TIMEOUT != ret) {
|
||||
fall_back_to_sleep = true;
|
||||
}
|
||||
TRANS_LOG(WARN, "failed to wait ObTsSyncGetTsCbTask", K(ret), K(tenant_id), K(timeout_us));
|
||||
}
|
||||
} else {
|
||||
scn.convert_for_gts(gts_result);
|
||||
}
|
||||
if (need_recycle_task) {
|
||||
ObTsSyncGetTsCbTaskPool::get_instance().recycle_task(task);
|
||||
}
|
||||
} else {
|
||||
fall_back_to_sleep = true;
|
||||
}
|
||||
if (fall_back_to_sleep) {
|
||||
TRANS_LOG(WARN, "failed to get ObTsSyncGetTsCbTask, fall back to sleep", K(ret));
|
||||
|
||||
int64_t expire_ts = ObClockGenerator::getClock() + timeout_us;
|
||||
int retry_times = 0;
|
||||
const int64_t SLEEP_TIME_US = 500;
|
||||
do {
|
||||
const int64_t now = ObClockGenerator::getClock();
|
||||
if (now >= expire_ts) {
|
||||
ret = OB_TIMEOUT;
|
||||
} else if (OB_FAIL(ts_source->get_gts(stc, NULL, gts_result, receive_gts_ts))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(SLEEP_TIME_US);
|
||||
} else {
|
||||
TRANS_LOG(WARN, "get gts fail", K(ret), K(now));
|
||||
}
|
||||
} else {
|
||||
scn.convert_for_gts(gts_result);
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTsMgr::get_ts_sync(const uint64_t tenant_id,
|
||||
const int64_t timeout_us,
|
||||
SCN &scn,
|
||||
|
@ -86,6 +86,12 @@ public:
|
||||
ObTsCbTask *task,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts) = 0;
|
||||
virtual int get_gts_sync(const uint64_t tenant_id,
|
||||
const MonotonicTs stc,
|
||||
const int64_t timeout_us,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts) = 0;
|
||||
|
||||
virtual int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) = 0;
|
||||
virtual int get_ts_sync(const uint64_t tenant_id, const int64_t timeout_ts,
|
||||
share::SCN &scn, bool &is_external_consistent) = 0;
|
||||
@ -285,6 +291,64 @@ private:
|
||||
bool need_revert_;
|
||||
};
|
||||
|
||||
class ObTsSyncGetTsCbTask : public ObTsCbTask
|
||||
{
|
||||
public:
|
||||
friend class ObTsSyncGetTsCbTaskPool;
|
||||
ObTsSyncGetTsCbTask()
|
||||
:is_inited_(false), task_id_(0), is_occupied_(false), is_finished_(false),
|
||||
is_early_exit_(false), stc_(0), tenant_id_(0), errcode_(OB_SUCCESS) {}
|
||||
~ObTsSyncGetTsCbTask() {}
|
||||
int init(uint64_t task_id);
|
||||
int config(MonotonicTs stc, uint64_t tenant_id);
|
||||
int gts_callback_interrupted(const int errcode) override;
|
||||
int get_gts_callback(const MonotonicTs srr, const share::SCN >s,
|
||||
const MonotonicTs receive_gts_ts) override;
|
||||
int gts_elapse_callback(const MonotonicTs srr, const share::SCN >s) override;
|
||||
MonotonicTs get_stc() const override;
|
||||
uint64_t hash() const override;
|
||||
uint64_t get_tenant_id() const override;
|
||||
int wait(const int64_t timeout_us, share::SCN &scn, bool &need_recycle_task);
|
||||
private:
|
||||
bool is_inited_;
|
||||
uint64_t task_id_;
|
||||
// whether this callback task is being used
|
||||
bool is_occupied_ __attribute__((aligned(8)));
|
||||
// whether the callback has been invoked
|
||||
bool is_finished_;
|
||||
// whether the caller exits (due to timeout) before the callback is invoked
|
||||
bool is_early_exit_;
|
||||
share::SCN gts_result_;
|
||||
ObThreadCond cond_;
|
||||
MonotonicTs stc_;
|
||||
uint64_t tenant_id_;
|
||||
int errcode_;
|
||||
};
|
||||
|
||||
STATIC_ASSERT(sizeof(ObTsSyncGetTsCbTask) <= 256, "ObTsSyncGetTsCbTask is too large");
|
||||
/**
|
||||
* The resource pool of ObTsSyncGetTsCbTask. The pool has a fixed size of cbtasks, and the cbtasks
|
||||
* can be reused.
|
||||
*/
|
||||
class ObTsSyncGetTsCbTaskPool
|
||||
{
|
||||
public:
|
||||
static constexpr int64_t POOL_SIZE = 8000;
|
||||
ObTsSyncGetTsCbTaskPool() {}
|
||||
~ObTsSyncGetTsCbTaskPool() {}
|
||||
static ObTsSyncGetTsCbTaskPool& get_instance()
|
||||
{
|
||||
static ObTsSyncGetTsCbTaskPool pool;
|
||||
return pool;
|
||||
}
|
||||
int init();
|
||||
int get_task(MonotonicTs stc, uint64_t tenant_id, ObTsSyncGetTsCbTask *&task);
|
||||
int recycle_task(ObTsSyncGetTsCbTask *task);
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObTsSyncGetTsCbTask tasks_[POOL_SIZE];
|
||||
};
|
||||
|
||||
typedef common::ObLinkHashMap<ObTsTenantInfo, ObTsSourceInfo, ObTsSourceInfoAlloc> ObTsSourceInfoMap;
|
||||
class ObTsMgr : public share::ObThreadPool, public ObITsMgr
|
||||
{
|
||||
@ -315,6 +379,20 @@ public:
|
||||
ObTsCbTask *task,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts);
|
||||
/**
|
||||
* 与`get_gts`相对应的同步接口,用于同步获取合适的GTS时间戳,可传入超时时间以避免长时间等待。
|
||||
* 相较于原有同步接口`get_ts_sync`,本接口的性能更好
|
||||
* @param[in] tenant_id
|
||||
* @param[in] stc: 需要获取GTS的时间点,一般取current time
|
||||
* @param[in] timeout_us: 超时时长,单位us
|
||||
* @param[out] scn: 获取到的GTS时间戳结果
|
||||
* @param[out] receive_gts_ts: 收到GTS response的时间点
|
||||
*/
|
||||
int get_gts_sync(const uint64_t tenant_id,
|
||||
const MonotonicTs stc,
|
||||
const int64_t timeout_us,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts);
|
||||
//仅仅获取本地gts cache的最新值,但可能会失败,失败之后处理逻辑如下:
|
||||
//1. 如果task == NULL,说明调用者不需要异步回调,直接返回报错,由调用者处理
|
||||
//2. 如果task != NULL,需要注册异步回调任务
|
||||
|
@ -2405,10 +2405,14 @@ TEST_F(ObTestTx, interrupt_get_read_snapshot)
|
||||
PREPARE_TX(n1, tx);
|
||||
ObTxReadSnapshot snapshot;
|
||||
n1->get_ts_mgr_().inject_get_gts_error(OB_EAGAIN);
|
||||
ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot));
|
||||
ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED));
|
||||
ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret);
|
||||
ASSERT_EQ(OB_ERR_INTERRUPTED, wait_ret);
|
||||
int ret = OB_SUCCESS;
|
||||
do {
|
||||
ASYNC_DO(acq_snapshot, n1->get_read_snapshot(tx, ObTxIsolationLevel::RC, n1->ts_after_ms(20 * 1000), snapshot));
|
||||
ASSERT_EQ(OB_SUCCESS, n1->interrupt(tx, OB_TRANS_KILLED));
|
||||
ASYNC_WAIT(acq_snapshot, 2000 * 1000, wait_ret);
|
||||
ret = wait_ret;
|
||||
} while (OB_GTS_NOT_READY == ret);
|
||||
ASSERT_EQ(OB_ERR_INTERRUPTED, ret);
|
||||
ROLLBACK_TX(n1, tx);
|
||||
}
|
||||
|
||||
|
@ -271,6 +271,30 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
int get_gts_sync(const uint64_t tenant_id,
|
||||
const MonotonicTs stc,
|
||||
const int64_t timeout_us,
|
||||
share::SCN &scn,
|
||||
MonotonicTs &receive_gts_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t expire_ts = ObClockGenerator::getClock() + timeout_us;
|
||||
|
||||
do {
|
||||
int64_t n = ObClockGenerator::getClock();
|
||||
if (n >= expire_ts) {
|
||||
ret = OB_TIMEOUT;
|
||||
} else if (OB_FAIL(get_gts(tenant_id, stc, NULL, scn, receive_gts_ts))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(500);
|
||||
}
|
||||
}
|
||||
} while (OB_EAGAIN == ret);
|
||||
|
||||
return ret;
|
||||
return get_gts(tenant_id, stc, NULL, scn, receive_gts_ts);
|
||||
}
|
||||
|
||||
int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) {
|
||||
if (get_gts_error_) { return get_gts_error_; }
|
||||
return OB_SUCCESS;
|
||||
|
Loading…
x
Reference in New Issue
Block a user