[CP] [gts] refine gts callback interrupt

This commit is contained in:
jw-guo 2024-03-26 05:20:18 +00:00 committed by ob-robot
parent 7f73a27bdd
commit 611b488e79
12 changed files with 159 additions and 63 deletions

View File

@ -263,6 +263,12 @@ public:
UNUSED(tenant_id);
return OB_SUCCESS;
}
virtual int interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id, const share::ObLSID ls_id)
{
UNUSED(tenant_id);
UNUSED(ls_id);
return OB_SUCCESS;
}
private:
MockObGtsSource &source_;
};

View File

@ -805,6 +805,9 @@ int ObLSTxService::offline()
TRANS_LOG(WARN, "block all failed", K_(ls_id));
} else if (OB_FAIL(mgr_->kill_all_tx(graceful, unused_is_all_tx_clean_up))) {
TRANS_LOG(WARN, "kill_all_tx failed", K_(ls_id));
} else if (OB_FAIL(MTL(ObTransService *)->get_ts_mgr()->interrupt_gts_callback_for_ls_offline(MTL_ID(),
ls_id_))) {
TRANS_LOG(WARN, "interrupt gts callback failed", KR(ret), K_(ls_id));
} else if (mgr_->get_tx_ctx_count() > 0) {
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {

View File

@ -357,16 +357,21 @@ int64_t ObGtsSource::get_task_count() const
return task_count;
}
int ObGtsSource::gts_callback_interrupted(const int errcode)
int ObGtsSource::gts_callback_interrupted(const int errcode, const share::ObLSID ls_id)
{
int ret = OB_SUCCESS;
int64_t task_count = 0;
for (int64_t i = 0; i < TOTAL_GTS_QUEUE_COUNT; i++) {
queue_[i].gts_callback_interrupted(errcode);
queue_[i].gts_callback_interrupted(errcode, ls_id);
task_count += queue_[i].get_task_count();
}
if (task_count > 0) {
ret = OB_EAGAIN;
if (OB_LS_OFFLINE != errcode) {
// in this case, all callbacck tasks of this tenant need to be cleared.
if (task_count > 0) {
ret = OB_EAGAIN;
}
} else {
// if OB_LS_OFFLINE, return OB_SUCCESS
}
return ret;
}

View File

@ -82,7 +82,7 @@ public:
int get_srr(MonotonicTs &srr);
int get_latest_srr(MonotonicTs &latest_srr);
int64_t get_task_count() const;
int gts_callback_interrupted(const int errcode);
int gts_callback_interrupted(const int errcode, const share::ObLSID ls_id);
public:
int update_gts(const int64_t gts, bool &update);
int get_gts(const MonotonicTs stc, ObTsCbTask *task, int64_t &gts, MonotonicTs &receive_gts_ts);

View File

@ -153,15 +153,16 @@ int ObGTSTaskQueue::push(ObTsCbTask *task)
return ret;
}
int ObGTSTaskQueue::gts_callback_interrupted(const int errcode)
int ObGTSTaskQueue::gts_callback_interrupted(const int errcode, const share::ObLSID ls_id)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
} else {
int64_t last_tenant_id = OB_INVALID_TENANT_ID;
MAKE_TENANT_SWITCH_SCOPE_GUARD(ts_guard);
int64_t count = queue_.size();
int64_t again_count = 0;
int64_t error_count = 0;
TRANS_LOG(INFO, "interrupt gts callback start", K(count), K(errcode), K(ls_id));
while (OB_SUCC(ret) && count > 0) {
common::ObLink *data = NULL;
(void)queue_.pop(data);
@ -171,31 +172,34 @@ int ObGTSTaskQueue::gts_callback_interrupted(const int errcode)
break;
} else {
const uint64_t tenant_id = task->get_tenant_id();
if (tenant_id != last_tenant_id) {
if (OB_FAIL(ts_guard.switch_to(tenant_id))) {
TRANS_LOG(ERROR, "switch tenant failed", K(ret), K(tenant_id));
} else {
last_tenant_id = tenant_id;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(task->gts_callback_interrupted(errcode))) {
MTL_SWITCH(tenant_id) {
if (OB_FAIL(task->gts_callback_interrupted(errcode, ls_id))) {
if (OB_EAGAIN != ret) {
TRANS_LOG(WARN, "gts callback interrupted fail", KR(ret), KP(task));
error_count++;
// in this case, need restart the observer
TRANS_LOG(ERROR, "gts callback interrupted fail", KR(ret), KP(task));
} else {
again_count++;
// case 1, errcode is OB_LS_OFFLINE and ls_id is not equal to ls_id in part ctx
// case 2, errcode is OB_LS_OFFLINE and task is an object of ObTsSyncGetTsCbTask
// return OB_EAGAIN
if (OB_FAIL(queue_.push(task))) {
// since task is not null, this failure is impossible
TRANS_LOG(ERROR, "push gts task failed", KR(ret), KP(task));
} else {
TRANS_LOG(DEBUG, "push back gts task", KP(task));
break;
}
}
} else {
TRANS_LOG(DEBUG, "gts callback interrupted success", KP(task));
}
} else {
error_count++;
TRANS_LOG(ERROR, "switch tenant failed", KR(ret), K(errcode), K(ls_id));
}
}
}
TRANS_LOG(INFO, "interrupt gts callback end", K(again_count), K(error_count));
}
return ret;
}

View File

@ -38,7 +38,7 @@ public:
const MonotonicTs receive_gts_ts);
int push(ObTsCbTask *task);
int64_t get_task_count() const { return queue_.size(); }
int gts_callback_interrupted(const int errcode);
int gts_callback_interrupted(const int errcode, const share::ObLSID ls_id);
private:
static const int64_t TOTAL_WAIT_TASK_NUM = 500 * 1000;
private:

View File

@ -180,13 +180,27 @@ int ObTimestampService::handle_request(const ObGtsRequest &request, ObGtsRpcResu
return ret;
}
#ifndef ERRSIM
ERRSIM_POINT_DEF(EN_GTS_HANDLE_REQUEST)
#endif
int ObTimestampService::handle_local_request_(const ObGtsRequest &request, obrpc::ObGtsRpcResult &result)
{
int ret = OB_SUCCESS;
int64_t gts = 0;
const uint64_t tenant_id = request.get_tenant_id();
const MonotonicTs srr = request.get_srr();
if (OB_FAIL(get_timestamp(gts))) {
#ifndef ERRSIM
ret = EN_GTS_HANDLE_REQUEST;
#endif
// the fisrt case for errsim
if (OB_SUCCESS != ret) {
TRANS_LOG(WARN, "errsim for gts handle local request", KR(ret));
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = result.init(tenant_id, ret, srr, 0, 0))) {
TRANS_LOG(WARN, "gts result init failed", K(tmp_ret), K(request));
}
} else if (OB_FAIL(get_timestamp(gts))) {
if (EXECUTE_COUNT_PER_SEC(10)) {
TRANS_LOG(WARN, "get timestamp failed", KR(ret));
}

View File

@ -1185,10 +1185,10 @@ bool ObPartTransCtx::has_persisted_log_() const
(is_follower_() && replay_completeness_.is_unknown());
}
int ObPartTransCtx::gts_callback_interrupted(const int errcode)
int ObPartTransCtx::gts_callback_interrupted(const int errcode,
const share::ObLSID target_ls_id)
{
int ret = OB_SUCCESS;
UNUSED(errcode);
bool need_revert_ctx = false;
{
@ -1196,15 +1196,19 @@ int ObPartTransCtx::gts_callback_interrupted(const int errcode)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(ERROR, "ObPartTransCtx not inited", K(ret));
} else if (OB_UNLIKELY(!is_exiting_)) {
// at this time, ObTxCtxMgr should already be stopped,
// so ObPartTransCtx should already be killed
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "ObPartTransCtx is not exiting", K(ret));
} else if (OB_LS_OFFLINE == errcode) {
if (target_ls_id != ls_id_) {
ret = OB_EAGAIN;
} else {
need_revert_ctx = true;
sub_state_.clear_gts_waiting();
TRANS_LOG(INFO, "transaction is interruputed gts callback", KR(ret), K(errcode), "context", *this);
}
} else {
// for OB_TENANT_NOT_EXIST
need_revert_ctx = true;
sub_state_.clear_gts_waiting();
TRANS_LOG(INFO, "transaction is interruputed gts callback", KR(ret), "context", *this);
TRANS_LOG(INFO, "transaction is interruputed gts callback", KR(ret), K(errcode), "context", *this);
}
}
if (need_revert_ctx) {

View File

@ -206,7 +206,7 @@ public:
int get_prepare_version_if_prepared(bool &is_prepared, share::SCN &prepare_version);
const share::SCN get_commit_version() const { return ctx_tx_data_.get_commit_version(); }
uint64_t hash() const { return trans_id_.hash(); }
int gts_callback_interrupted(const int errcode);
int gts_callback_interrupted(const int errcode, const share::ObLSID target_ls_id);
int get_gts_callback(const MonotonicTs srr, const share::SCN &gts, const MonotonicTs receive_gts_ts);
int gts_elapse_callback(const MonotonicTs srr, const share::SCN &gts);
MonotonicTs get_stc() const { return stc_; }

View File

@ -90,12 +90,12 @@ int ObTsSourceInfo::check_if_tenant_has_been_dropped(const uint64_t tenant_id, b
return ret;
}
int ObTsSourceInfo::gts_callback_interrupted(const int errcode)
int ObTsSourceInfo::gts_callback_interrupted(const int errcode, const share::ObLSID ls_id)
{
int ret = OB_SUCCESS;
const int64_t task_count = gts_source_.get_task_count();
if (0 != task_count) {
ret = gts_source_.gts_callback_interrupted(errcode);
ret = gts_source_.gts_callback_interrupted(errcode, ls_id);
}
return ret;
}
@ -124,13 +124,15 @@ int ObTsSyncGetTsCbTask::init(uint64_t task_id)
return ret;
}
int ObTsSyncGetTsCbTask::gts_callback_interrupted(const int errcode)
int ObTsSyncGetTsCbTask::gts_callback_interrupted(const int errcode, const share::ObLSID ls_id)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not init", K(ret));
} else if (OB_LS_OFFLINE == errcode) {
ret = OB_EAGAIN;
} else {
ObThreadCondGuard cond_guard(cond_);
if (is_early_exit_) {
@ -486,10 +488,8 @@ void ObTsMgr::run1()
{
int ret = OB_SUCCESS;
ObSEArray<uint64_t, 1> ids;
ObSEArray<uint64_t, 1> check_ids;
ObGtsRefreshFunctor gts_refresh_funtor;
GetObsoleteTenantFunctor get_obsolete_tenant_functor(TS_SOURCE_INFO_OBSOLETE_TIME, ids);
CheckTenantFunctor check_tenant_functor(check_ids);
// cluster版本小于2.0不会更新gts
lib::set_thread_name("TsMgr");
while (!has_set_stop()) {
@ -497,25 +497,27 @@ void ObTsMgr::run1()
ob_usleep(REFRESH_GTS_INTERVEL_US);
ts_source_info_map_.for_each(gts_refresh_funtor);
ts_source_info_map_.for_each(get_obsolete_tenant_functor);
ts_source_info_map_.for_each(check_tenant_functor);
for (int64_t i = 0; i < ids.count(); i++) {
const uint64_t tenant_id = ids.at(i);
if (OB_FAIL(delete_tenant_(tenant_id))) {
TRANS_LOG(WARN, "delete tenant failed", K(ret), K(tenant_id));
// ignore ret
ret = OB_SUCCESS;
MTL_SWITCH(tenant_id) {
TRANS_LOG(WARN, "gts is not used for a long time", K(tenant_id));
} else {
if (OB_TENANT_NOT_IN_SERVER == ret) {
if (OB_FAIL(delete_tenant_(tenant_id))) {
TRANS_LOG(WARN, "delete tenant failed", K(ret), K(tenant_id));
// ignore ret
ret = OB_SUCCESS;
} else {
TRANS_LOG(INFO, "delete tenant success", K(tenant_id));
}
} else {
TRANS_LOG(WARN, "switch tenant failed", K(ret), K(tenant_id));
// ignore ret
ret = OB_SUCCESS;
}
}
}
ids.reset();
for (int64_t i = 0; i < check_ids.count(); i++) {
const uint64_t tenant_id = check_ids.at(i);
if (OB_FAIL(remove_dropped_tenant(tenant_id))) {
TRANS_LOG(WARN, "remove dropped tenant failed", K(ret), K(tenant_id));
// ignore ret
ret = OB_SUCCESS;
}
}
check_ids.reset();
}
}
@ -746,18 +748,32 @@ int ObTsMgr::delete_tenant_(const uint64_t tenant_id)
int ObTsMgr::remove_dropped_tenant(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObTsTenantInfo tenant_info(tenant_id);
ObTsSourceInfo *ts_source_info = NULL;
ObTsSourceInfoGuard info_guard;
if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, false, false))) {
TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id));
} else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ts source info is NULL", KR(ret), K(tenant_id));
} else if (OB_FAIL(ts_source_info->gts_callback_interrupted(OB_TENANT_NOT_EXIST))) {
TRANS_LOG(WARN, "interrupt gts callback failed", KR(ret), K(tenant_id));
share::ObLSID ls_id;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObTsMgr is not inited", K(ret));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", KR(ret), K(tenant_id));
} else {
TRANS_LOG(INFO, "interrupt gts callback success", K(tenant_id));
ObTsSourceInfo *ts_source_info = NULL;
ObTsTenantInfo tenant_info(tenant_id);
ObTsSourceInfoGuard info_guard;
if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, false, false))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "no need cleanup for empty ts resource", K(tenant_id));
} else {
TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id));
}
} else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ts source info is NULL", KR(ret), K(tenant_id));
} else if (OB_FAIL(ts_source_info->gts_callback_interrupted(OB_TENANT_NOT_EXIST, ls_id))) {
TRANS_LOG(WARN, "interrupt gts callback failed", KR(ret), K(tenant_id));
} else {
TRANS_LOG(INFO, "remove ts resource success", K(tenant_id));
}
}
return ret;
}
@ -1288,5 +1304,41 @@ int ObTsMgr::add_tenant_(const uint64_t tenant_id)
return ret;
}
int ObTsMgr::interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id,
const share::ObLSID ls_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObTsMgr is not inited", K(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(ls_id));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", KR(ret), K(tenant_id));
} else {
ObTsSourceInfo *ts_source_info = NULL;
ObTsTenantInfo tenant_info(tenant_id);
ObTsSourceInfoGuard info_guard;
if (OB_FAIL(get_ts_source_info_opt_(tenant_id, info_guard, false, false))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
TRANS_LOG(INFO, "no need cleanup for empty ts resource", K(tenant_id));
} else {
TRANS_LOG(WARN, "get ts source info failed", K(ret), K(tenant_id));
}
} else if (OB_ISNULL(ts_source_info = info_guard.get_ts_source_info())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ts source info is NULL", KR(ret), K(tenant_id));
} else if (OB_FAIL(ts_source_info->gts_callback_interrupted(OB_LS_OFFLINE, ls_id))) {
TRANS_LOG(WARN, "interrupt gts callback failed", KR(ret), K(tenant_id), K(ls_id));
} else {
TRANS_LOG(INFO, "interrupt gts callback success", K(tenant_id), K(ls_id));
}
}
return ret;
}
} // transaction
} // oceanbase

View File

@ -68,7 +68,7 @@ class ObTsCbTask : public common::ObLink
public:
ObTsCbTask() {}
virtual ~ObTsCbTask() {}
virtual int gts_callback_interrupted(const int errcode) = 0;
virtual int gts_callback_interrupted(const int errcode, const share::ObLSID ls_id) = 0;
virtual int get_gts_callback(const MonotonicTs srr, const share::SCN &gts, const MonotonicTs receive_gts_ts) = 0;
virtual int gts_elapse_callback(const MonotonicTs srr, const share::SCN &gts) = 0;
virtual MonotonicTs get_stc() const = 0;
@ -101,6 +101,7 @@ public:
virtual int wait_gts_elapse(const uint64_t tenant_id, const share::SCN &scn) = 0;
virtual bool is_external_consistent(const uint64_t tenant_id) = 0;
virtual int remove_dropped_tenant(const uint64_t tenant_id) = 0;
virtual int interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id, const share::ObLSID ls_id) = 0;
public:
VIRTUAL_TO_STRING_KV("", "");
};
@ -120,7 +121,7 @@ public:
void update_last_access_ts() { last_access_ts_ = common::ObClockGenerator::getClock(); }
int64_t get_last_access_ts() const { return last_access_ts_; }
int check_if_tenant_has_been_dropped(const uint64_t tenant_id, bool &has_dropped);
int gts_callback_interrupted(const int errcode);
int gts_callback_interrupted(const int errcode, const share::ObLSID ls_id);
private:
bool is_inited_;
uint64_t tenant_id_;
@ -302,7 +303,7 @@ public:
~ObTsSyncGetTsCbTask() {}
int init(uint64_t task_id);
int config(MonotonicTs stc, uint64_t tenant_id);
int gts_callback_interrupted(const int errcode) override;
int gts_callback_interrupted(const int errcode, const share::ObLSID ls_id) override;
int get_gts_callback(const MonotonicTs srr, const share::SCN &gts,
const MonotonicTs receive_gts_ts) override;
int gts_elapse_callback(const MonotonicTs srr, const share::SCN &gts) override;
@ -372,6 +373,7 @@ public:
int handle_gts_result(const uint64_t tenant_id, const int64_t queue_index, const int ts_type);
int update_gts(const uint64_t tenant_id, const MonotonicTs srr, const int64_t gts, const int ts_type, bool &update);
int delete_tenant(const uint64_t tenant_id);
int interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id, const share::ObLSID ls_id);
public:
int update_gts(const uint64_t tenant_id, const int64_t gts, bool &update);
//根据stc获取合适的gts值,如果条件不满足需要注册gts task,等异步回调

View File

@ -331,6 +331,12 @@ public:
return OB_SUCCESS;
}
int interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id, const share::ObLSID ls_id) {
UNUSED(tenant_id);
UNUSED(ls_id);
return OB_SUCCESS;
}
int update_base_ts(const int64_t base_ts) { return OB_SUCCESS; }
int get_base_ts(int64_t &base_ts) { return OB_SUCCESS; }
bool is_external_consistent(const uint64_t tenant_id) { return true; }