fix switch to follower forcedly will failed due to no memory

This commit is contained in:
chinaxing
2024-02-08 08:36:43 +00:00
committed by ob-robot
parent 4b67451a71
commit 01d8fbf351
12 changed files with 183 additions and 87 deletions

View File

@ -214,6 +214,19 @@ int ObTransCtx::defer_callback_scheduler_(const int retcode, const SCN &commit_v
return ret; return ret;
} }
int ObTransCtx::prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list)
{
int ret = OB_SUCCESS;
if (OB_FAIL(commit_cb_.init(trans_service_, trans_id_, cb_ret, share::SCN()))) {
TRANS_LOG(WARN, "init commit cb fail", K(ret), KPC(this));
} else if (OB_FAIL(commit_cb_.link(this, cb_list))) {
TRANS_LOG(WARN, "link commit cb fail", K(ret), KPC(this));
} else {
cb_list = &commit_cb_;
}
return ret;
}
void ObTransCtx::generate_request_id_() void ObTransCtx::generate_request_id_()
{ {
const int64_t request_id = ObClockGenerator::getClock(); const int64_t request_id = ObClockGenerator::getClock();
@ -337,5 +350,10 @@ int ObTransCtx::set_app_trace_id_(const ObString &app_trace_id)
return ret; return ret;
} }
void ObTransCtx::release_ctx_ref()
{
ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this);
}
} // transaction } // transaction
} // oceanbase } // oceanbase

View File

@ -161,14 +161,14 @@ public:
const ObAddr &get_addr() const { return addr_; } const ObAddr &get_addr() const { return addr_; }
virtual int64_t get_part_trans_action() const { return part_trans_action_; } virtual int64_t get_part_trans_action() const { return part_trans_action_; }
int acquire_ctx_ref() { return acquire_ctx_ref_(); } int acquire_ctx_ref() { return acquire_ctx_ref_(); }
void release_ctx_ref();
ObITransRpc *get_trans_rpc() const { return rpc_; } ObITransRpc *get_trans_rpc() const { return rpc_; }
public: public:
virtual bool is_inited() const = 0; virtual bool is_inited() const = 0;
virtual int handle_timeout(const int64_t delay) = 0; virtual int handle_timeout(const int64_t delay) = 0;
virtual int kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array) = 0; virtual int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list) = 0;
// thread unsafe // thread unsafe
VIRTUAL_TO_STRING_KV(KP(this), VIRTUAL_TO_STRING_KV(KP(this), K_(ref),
K_(trans_id), K_(trans_id),
K_(tenant_id), K_(tenant_id),
K_(is_exiting), K_(is_exiting),
@ -189,6 +189,7 @@ protected:
ObITsMgr *get_ts_mgr_(); ObITsMgr *get_ts_mgr_();
bool has_callback_scheduler_(); bool has_callback_scheduler_();
int defer_callback_scheduler_(const int ret, const share::SCN &commit_version); int defer_callback_scheduler_(const int ret, const share::SCN &commit_version);
int prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list);
int64_t get_remaining_wait_interval_us_() int64_t get_remaining_wait_interval_us_()
{ {
return trans_need_wait_wrap_.get_remaining_wait_interval_us(); return trans_need_wait_wrap_.get_remaining_wait_interval_us();
@ -210,7 +211,6 @@ protected:
ObLightHashLink::dec_ref(1); ObLightHashLink::dec_ref(1);
TRANS_LOG(DEBUG, "dec tx ctx ref", KPC(this)); TRANS_LOG(DEBUG, "dec tx ctx ref", KPC(this));
} }
virtual int register_timeout_task_(const int64_t interval_us); virtual int register_timeout_task_(const int64_t interval_us);
virtual int unregister_timeout_task_(); virtual int unregister_timeout_task_();
void generate_request_id_(); void generate_request_id_();

View File

@ -246,11 +246,13 @@ int ObLSTxCtxMgr::offline()
return ret; return ret;
} }
int ObLSTxCtxMgr::process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const int ObLSTxCtxMgr::process_callback_(ObTxCommitCallback *&cb_list) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
for (int64_t i = 0; i < cb_array.count(); i++) { ObTxCommitCallback *next = NULL;
cb_array.at(i).callback(); for (ObTxCommitCallback *iter = cb_list; iter != NULL; iter = next) {
ObTxCommitCallback *next = iter->get_link_next();
iter->callback();
} }
return ret; return ret;
} }
@ -727,7 +729,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTimeGuard timeguard("ObLSTxCtxMgr::switch_to_follower_forcedly"); ObTimeGuard timeguard("ObLSTxCtxMgr::switch_to_follower_forcedly");
ObSEArray<ObTxCommitCallback, 4> cb_array; ObTxCommitCallback *cb_list = NULL;
{ {
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
StateHelper state_helper(ls_id_, state_); StateHelper state_helper(ls_id_, state_);
@ -736,13 +738,10 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
} else if (is_follower_()) { } else if (is_follower_()) {
// already follower, do nothing // already follower, do nothing
} else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) {
TRANS_LOG(ERROR, "reserve callback array error", KR(ret));
ret = OB_EAGAIN;
} else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) { } else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) {
TRANS_LOG(ERROR, "switch state error", KR(ret), "manager", *this); TRANS_LOG(ERROR, "switch state error", KR(ret), "manager", *this);
} else { } else {
SwitchToFollowerForcedlyFunctor fn(cb_array); SwitchToFollowerForcedlyFunctor fn(cb_list);
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
TRANS_LOG(ERROR, "for each transaction context error", KR(ret), "manager", *this); TRANS_LOG(ERROR, "for each transaction context error", KR(ret), "manager", *this);
} else { } else {
@ -756,7 +755,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
} }
timeguard.click(); timeguard.click();
// run callback out of lock, ignore ret // run callback out of lock, ignore ret
(void)process_callback_(cb_array); (void)process_callback_(cb_list);
if (timeguard.get_diff() > 3 * 1000000) { if (timeguard.get_diff() > 3 * 1000000) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "switch_to_follower_forcedly use too much time", K(timeguard), "manager", *this); TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "switch_to_follower_forcedly use too much time", K(timeguard), "manager", *this);
} }
@ -838,7 +837,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
} }
timeguard.click(); timeguard.click();
ObSEArray<ObTxCommitCallback, 4> cb_array; ObTxCommitCallback *cb_list = NULL;
{ {
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
timeguard.click(); timeguard.click();
@ -850,14 +849,11 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
TRANS_LOG(WARN, "not init", KR(ret), K(ls_id_)); TRANS_LOG(WARN, "not init", KR(ret), K(ls_id_));
} else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) { } else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) {
TRANS_LOG(WARN, "switch state error", KR(ret), K(tenant_id_), K(ls_id_), K(state_)); TRANS_LOG(WARN, "switch state error", KR(ret), K(tenant_id_), K(ls_id_), K(state_));
} else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) {
TRANS_LOG(ERROR, "reserve callback array error", KR(ret));
ret = OB_EAGAIN;
} else { } else {
timeguard.click(); timeguard.click();
// TODO // TODO
const int64_t abs_expired_time = INT64_MAX; const int64_t abs_expired_time = INT64_MAX;
SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_array); SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_list);
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
TRANS_LOG(WARN, "for each tx ctx error", KR(ret), "manager", *this); TRANS_LOG(WARN, "for each tx ctx error", KR(ret), "manager", *this);
ret = fn.get_ret(); ret = fn.get_ret();
@ -882,7 +878,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
timeguard.click(); timeguard.click();
} }
} }
(void)process_callback_(cb_array); (void)process_callback_(cb_list);
timeguard.click(); timeguard.click();
TRANS_LOG(INFO, "[LsTxCtxMgr] switch_to_follower_gracefully", K(ret), KPC(this), K(process_count)); TRANS_LOG(INFO, "[LsTxCtxMgr] switch_to_follower_gracefully", K(ret), KPC(this), K(process_count));
if (timeguard.get_diff() > 1000000) { if (timeguard.get_diff() > 1000000) {
@ -941,7 +937,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
StateHelper state_helper(ls_id_, state_); StateHelper state_helper(ls_id_, state_);
ObSEArray<ObTxCommitCallback, 4> cb_array; ObTxCommitCallback *cb_list = NULL;
const KillTransArg arg(graceful); const KillTransArg arg(graceful);
ObTimeGuard timeguard("ctxmgr stop"); ObTimeGuard timeguard("ctxmgr stop");
{ {
@ -961,7 +957,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
KillTxCtxFunctor fn(arg, cb_array); KillTxCtxFunctor fn(arg, cb_list);
fn.set_release_audit_mgr_lock(true); fn.set_release_audit_mgr_lock(true);
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) { if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret)); TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
@ -977,7 +973,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
if (timeguard.get_diff() > 3 * 1000000) { if (timeguard.get_diff() > 3 * 1000000) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "stop trans use too much time", K(timeguard), "manager", *this); TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "stop trans use too much time", K(timeguard), "manager", *this);
} }
process_callback_(cb_array); process_callback_(cb_list);
TRANS_LOG(INFO, "[LsTxCtxMgr] stop done", K(timeguard), "manager", *this); TRANS_LOG(INFO, "[LsTxCtxMgr] stop done", K(timeguard), "manager", *this);
return ret; return ret;
} }
@ -986,12 +982,12 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTimeGuard timeguard("ctxmgr kill_all_tx"); ObTimeGuard timeguard("ctxmgr kill_all_tx");
ObSEArray<ObTxCommitCallback, 4> cb_array; ObTxCommitCallback *cb_list = NULL;
const KillTransArg arg(graceful); const KillTransArg arg(graceful);
{ {
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count(); const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count();
KillTxCtxFunctor fn(arg, cb_array); KillTxCtxFunctor fn(arg, cb_list);
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) { if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret)); TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
} else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { } else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
@ -1002,7 +998,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
if (timeguard.get_diff() > 3 * 1000000) { if (timeguard.get_diff() > 3 * 1000000) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "kill_all_tx use too much time", K(timeguard), "manager", *this); TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "kill_all_tx use too much time", K(timeguard), "manager", *this);
} }
(void)process_callback_(cb_array); (void)process_callback_(cb_list);
TRANS_LOG(INFO, "[LsTxCtxMgr] kill_all_tx done", K(timeguard), "manager", *this); TRANS_LOG(INFO, "[LsTxCtxMgr] kill_all_tx done", K(timeguard), "manager", *this);
return ret; return ret;
} }

View File

@ -585,7 +585,7 @@ private:
static const int64_t RETRY_INTERVAL_US = 10 *1000; static const int64_t RETRY_INTERVAL_US = 10 *1000;
private: private:
int process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const; int process_callback_(ObTxCommitCallback *&cb_list) const;
void print_all_tx_ctx_(const int64_t max_print, const bool verbose); void print_all_tx_ctx_(const int64_t max_print, const bool verbose);
int64_t get_tx_ctx_count_() const { return ATOMIC_LOAD(&total_tx_ctx_count_); } int64_t get_tx_ctx_count_() const { return ATOMIC_LOAD(&total_tx_ctx_count_); }
int create_tx_ctx_(const ObTxCreateArg &arg, int create_tx_ctx_(const ObTxCreateArg &arg,

View File

@ -27,10 +27,35 @@ void ObTxCommitCallback::reset()
enable_ = false; enable_ = false;
inited_ = false; inited_ = false;
callback_count_ = 0; callback_count_ = 0;
if (linked_) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "should not be linked", KP(tx_ctx_), K(tx_id_), K(ret_));
if (tx_ctx_ && tx_ctx_->get_ref() > 0) {
tx_ctx_->release_ctx_ref();
}
linked_ = false;
}
tx_ctx_ = NULL;
txs_ = NULL; txs_ = NULL;
tx_id_.reset(); tx_id_.reset();
ret_ = OB_ERR_UNEXPECTED; ret_ = OB_ERR_UNEXPECTED;
commit_version_.reset(); commit_version_.reset();
link_next_ = NULL;
}
int ObTxCommitCallback::link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next)
{
int ret = OB_SUCCESS;
TRANS_LOG(DEBUG, "", KPC(tx_ctx), KP(link_next));
if (linked_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "already linked", KPC(this), KPC(tx_ctx), KP(link_next));
} else {
tx_ctx->acquire_ctx_ref();
tx_ctx_ = tx_ctx;
link_next_ = link_next;
linked_ = true;
}
return ret;
} }
int ObTxCommitCallback::callback() int ObTxCommitCallback::callback()
@ -46,6 +71,16 @@ int ObTxCommitCallback::callback()
++callback_count_; ++callback_count_;
txs_->handle_tx_commit_result(tx_id_, ret_, commit_version_); txs_->handle_tx_commit_result(tx_id_, ret_, commit_version_);
} }
if (linked_) {
TRANS_LOG(DEBUG, "linked commit cb", KPC(tx_ctx_), K(ret_));
if (OB_ISNULL(tx_ctx_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx ctx should not be null for linked commit cb", K(ret), KPC(this));
} else {
tx_ctx_->release_ctx_ref();
}
linked_ = false;
}
return ret; return ret;
} }

View File

@ -26,33 +26,74 @@ class ObTransService;
struct ObTxCommitCallback struct ObTxCommitCallback
{ {
public: public:
ObTxCommitCallback() { reset(); } ObTxCommitCallback() :
enable_(false),
inited_(false),
linked_(false),
callback_count_(0),
txs_(NULL),
tx_ctx_(NULL),
tx_id_(),
ret_(OB_ERR_UNEXPECTED),
commit_version_(),
link_next_(NULL)
{}
~ObTxCommitCallback() { reset(); } ~ObTxCommitCallback() { reset(); }
int init(ObTransService* txs, const ObTransID tx_id, const int ret, const share::SCN commit_version) int init(ObTransService* txs, const ObTransID tx_id, const int cb_ret, const share::SCN commit_version)
{ {
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
} else {
txs_ = txs; txs_ = txs;
tx_id_ = tx_id; tx_id_ = tx_id;
ret_ = ret; ret_ = cb_ret;
commit_version_ = commit_version; commit_version_ = commit_version;
inited_ = true; inited_ = true;
return OB_SUCCESS;
} }
return ret;
}
int link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next);
bool is_inited() { return inited_; } bool is_inited() { return inited_; }
void disable() { enable_ = false; } void disable() { enable_ = false; }
void enable() { enable_ = true; } void enable() { enable_ = true; }
bool is_enabled() { return enable_; } bool is_enabled() { return enable_; }
int get_cb_ret() const { return ret_; }
ObTxCommitCallback *get_link_next() const { return link_next_; }
void reset(); void reset();
void destroy() { reset(); } void destroy() { reset(); }
int callback(); int callback();
TO_STRING_KV(K_(inited), K_(enable), KP_(txs), K_(tx_id), K_(ret), K_(commit_version), K_(callback_count)); ObTxCommitCallback &operator=(const ObTxCommitCallback &right)
{
enable_ = right.enable_;
inited_ = right.inited_;
callback_count_ = right.callback_count_;
txs_ = right.txs_;
tx_id_ = right.tx_id_;
ret_ = right.ret_;
return *this;
}
TO_STRING_KV(K_(inited),
K_(enable),
K_(linked),
KP_(txs),
KP_(tx_ctx),
K_(tx_id),
K_(ret),
K_(commit_version),
K_(callback_count),
KP_(link_next));
public: public:
bool enable_; bool enable_;
bool inited_; bool inited_;
bool linked_;
int64_t callback_count_; int64_t callback_count_;
ObTransService* txs_; ObTransService* txs_;
ObTransCtx *tx_ctx_;
ObTransID tx_id_; ObTransID tx_id_;
int ret_; int ret_;
share::SCN commit_version_; share::SCN commit_version_;
ObTxCommitCallback *link_next_;
}; };
class ObTxCommitCallbackTask : public ObTransTask class ObTxCommitCallbackTask : public ObTransTask

View File

@ -160,7 +160,7 @@ private:
class SwitchToFollowerForcedlyFunctor class SwitchToFollowerForcedlyFunctor
{ {
public: public:
SwitchToFollowerForcedlyFunctor(ObIArray<ObTxCommitCallback> &cb_array) : cb_array_(cb_array) SwitchToFollowerForcedlyFunctor(ObTxCommitCallback *&cb_list) : cb_list_(cb_list)
{ {
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/) SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/)
} }
@ -172,7 +172,7 @@ public:
if (!tx_id.is_valid() || OB_ISNULL(tx_ctx)) { if (!tx_id.is_valid() || OB_ISNULL(tx_ctx)) {
tmp_ret = common::OB_INVALID_ARGUMENT; tmp_ret = common::OB_INVALID_ARGUMENT;
TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx)); TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx));
} else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_array_))) { } else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_list_))) {
TRANS_LOG_RET(ERROR, tmp_ret, "leader revoke failed", K(tx_id), K(*tx_ctx)); TRANS_LOG_RET(ERROR, tmp_ret, "leader revoke failed", K(tx_id), K(*tx_ctx));
} }
@ -180,7 +180,7 @@ public:
} }
private: private:
ObIArray<ObTxCommitCallback> &cb_array_; ObTxCommitCallback *&cb_list_;
}; };
class SwitchToLeaderFunctor class SwitchToLeaderFunctor
@ -217,9 +217,8 @@ private:
class SwitchToFollowerGracefullyFunctor class SwitchToFollowerGracefullyFunctor
{ {
public: public:
SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time, SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time, ObTxCommitCallback *&cb_list)
ObIArray<ObTxCommitCallback> &cb_array) : abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_list_(cb_list)
: abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_array_(cb_array)
{ {
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
} }
@ -242,7 +241,7 @@ public:
} }
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_array_))) { if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_list_))) {
TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(*tx_ctx)); TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(*tx_ctx));
ret_ = ret; ret_ = ret;
} else { } else {
@ -259,7 +258,7 @@ private:
int64_t abs_expired_time_; int64_t abs_expired_time_;
int64_t count_; int64_t count_;
int ret_; int ret_;
ObIArray<ObTxCommitCallback> &cb_array_; ObTxCommitCallback *&cb_list_;
}; };
class ResumeLeaderFunctor class ResumeLeaderFunctor
@ -320,8 +319,8 @@ private:
class KillTxCtxFunctor class KillTxCtxFunctor
{ {
public: public:
KillTxCtxFunctor(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array) KillTxCtxFunctor(const KillTransArg &arg, ObTxCommitCallback *&cb_list)
: arg_(arg), release_audit_mgr_lock_(false), cb_array(cb_array) : arg_(arg), release_audit_mgr_lock_(false), cb_list_(cb_list)
{ {
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
@ -340,7 +339,7 @@ public:
TRANS_LOG(WARN, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx)); TRANS_LOG(WARN, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx));
tmp_ret = common::OB_INVALID_ARGUMENT; tmp_ret = common::OB_INVALID_ARGUMENT;
} else { } else {
if (OB_SUCC(tx_ctx->kill(arg_, cb_array))) { if (OB_SUCC(tx_ctx->kill(arg_, cb_list_))) {
TRANS_LOG(INFO, "kill transaction success", K(tx_id), K_(arg)); TRANS_LOG(INFO, "kill transaction success", K(tx_id), K_(arg));
} else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) { } else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) {
TRANS_LOG(INFO, "transaction can not be killed", K(tx_id), "context", *tx_ctx); TRANS_LOG(INFO, "transaction can not be killed", K(tx_id), "context", *tx_ctx);
@ -355,7 +354,7 @@ public:
private: private:
KillTransArg arg_; KillTransArg arg_;
bool release_audit_mgr_lock_; bool release_audit_mgr_lock_;
ObIArray<ObTxCommitCallback> &cb_array; ObTxCommitCallback *&cb_list_;
}; };
class TransferOutTxOpFunctor class TransferOutTxOpFunctor

View File

@ -694,7 +694,7 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
return ret; return ret;
} }
int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array) int ObPartTransCtx::kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -743,12 +743,9 @@ int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &
// notify scheduler only if commit callback has not been armed // notify scheduler only if commit callback has not been armed
if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) { if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) {
if (exec_info_.scheduler_ == addr_) { if (exec_info_.scheduler_ == addr_) {
ObTxCommitCallback cb; if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(cb_param, cb_list))) {
cb.init(trans_service_, trans_id_, cb_param, SCN()); TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), K(cb_param), KPC(this));
if (OB_FAIL(cb_array.push_back(cb))) { ret = (ret == OB_SUCCESS) ? tmp_ret : ret;
TRANS_LOG(WARN, "push commit callback fail", K(ret), KPC(this));
} else {
commit_cb_.disable();
} }
} else { } else {
post_tx_commit_resp_(cb_param); post_tx_commit_resp_(cb_param);
@ -5813,7 +5810,7 @@ inline bool ObPartTransCtx::need_callback_scheduler_() {
// 3. resume_leader is called // 3. resume_leader is called
// 4. resume_leader fails // 4. resume_leader fails
// 5. revoke, and still in follower state // 5. revoke, and still in follower state
int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb_array) int ObPartTransCtx::switch_to_follower_forcedly(ObTxCommitCallback *&cb_list_head)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
common::ObTimeGuard timeguard("switch_to_follower_forcely", 10 * 1000); common::ObTimeGuard timeguard("switch_to_follower_forcely", 10 * 1000);
@ -5842,10 +5839,8 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) { if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) {
TRANS_LOG(WARN, "do local tx abort failed", K(ret)); TRANS_LOG(WARN, "do local tx abort failed", K(ret));
} else if (need_cb_scheduler) { } else if (need_cb_scheduler) {
ObTxCommitCallback cb; if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_TRANS_KILLED, cb_list_head))) {
cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN()); TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this));
if (OB_FAIL(cb_array.push_back(cb))) {
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
} }
} }
if (OB_FAIL(ret) && need_cb_scheduler) { if (OB_FAIL(ret) && need_cb_scheduler) {
@ -5889,18 +5884,21 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
// special handle commit triggered by local call: coordinator colocate with scheduler // special handle commit triggered by local call: coordinator colocate with scheduler
// let scheduler retry commit with RPC if required // let scheduler retry commit with RPC if required
if (need_callback_scheduler_()) { if (need_callback_scheduler_()) {
ObTxCommitCallback cb; int commit_ret = OB_SUCCESS;
// no CommitInfoLog has been submitted, txn must abort // no CommitInfoLog has been submitted, txn must abort
if (exec_info_.state_ == ObTxState::INIT && !sub_state_.is_info_log_submitted()) { if (exec_info_.state_ == ObTxState::INIT && !sub_state_.is_info_log_submitted()) {
cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN()); commit_ret = OB_TRANS_KILLED;
} else { } else {
// otherwise, txn either continue commit or abort, need retry to get final result // otherwise, txn either continue commit or abort, need retry to get final result
cb.init(trans_service_, trans_id_, OB_NOT_MASTER, SCN()); commit_ret = OB_NOT_MASTER;
} }
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(commit_ret, cb_list_head))) {
TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), KPC(this));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
} else {
TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler", TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler",
"commit_result", cb.ret_, KPC(this)); "commit_result", commit_ret, KPC(this));
if (OB_FAIL(cb_array.push_back(cb))) {
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
} }
} }
} }
@ -5921,7 +5919,7 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
return ret; return ret;
} }
int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &cb_array) int ObPartTransCtx::switch_to_follower_gracefully(ObTxCommitCallback *&cb_list_head)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool need_submit_log = false; bool need_submit_log = false;
@ -5965,11 +5963,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &
log_type = ObTxLogType::TX_COMMIT_INFO_LOG; log_type = ObTxLogType::TX_COMMIT_INFO_LOG;
} }
if (need_callback_scheduler_()) { if (need_callback_scheduler_()) {
ObTxCommitCallback cb; if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, cb_list_head))) {
cb.init(trans_service_, trans_id_, OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, SCN()); TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this));
} else {
TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this)); TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this));
if (OB_FAIL(cb_array.push_back(cb))) {
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
} }
} }
} }
@ -6025,10 +6022,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &
OB_ID(used), timeguard.get_diff(), OB_ID(used), timeguard.get_diff(),
OB_ID(ref), get_ref()); OB_ID(ref), get_ref());
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(ret), KPC(this), K(cb_array)); TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), KPC(this));
} else { } else {
#ifndef NDEBUG #ifndef NDEBUG
TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this), K(cb_array)); TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this));
#endif #endif
} }

View File

@ -192,7 +192,7 @@ public:
/* /*
* graceful kill: wait trx finish logging * graceful kill: wait trx finish logging
*/ */
int kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array); int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list);
memtable::ObMemtableCtx *get_memtable_ctx() { return &mt_ctx_; } memtable::ObMemtableCtx *get_memtable_ctx() { return &mt_ctx_; }
int commit(const ObTxCommitParts &parts, int commit(const ObTxCommitParts &parts,
const MonotonicTs &commit_time, const MonotonicTs &commit_time,
@ -298,7 +298,6 @@ private:
"2pc_role", "2pc_role",
get_2pc_role(), get_2pc_role(),
K_(collected), K_(collected),
K_(ref),
K_(rec_log_ts), K_(rec_log_ts),
K_(prev_rec_log_ts), K_(prev_rec_log_ts),
K_(lastest_snapshot), K_(lastest_snapshot),
@ -443,9 +442,9 @@ public:
// leader switch related // leader switch related
bool need_callback_scheduler_(); bool need_callback_scheduler_();
int switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb_array); int switch_to_follower_forcedly(ObTxCommitCallback *&cb_list);
int switch_to_leader(const share::SCN &start_working_ts); int switch_to_leader(const share::SCN &start_working_ts);
int switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &cb_array); int switch_to_follower_gracefully(ObTxCommitCallback *&cb_list);
int resume_leader(const share::SCN &start_working_ts); int resume_leader(const share::SCN &start_working_ts);
int supplement_undo_actions_if_exist_(); int supplement_undo_actions_if_exist_();

View File

@ -1104,8 +1104,17 @@ int ObPartTransCtx::post_tx_commit_resp_(const int status)
TRANS_LOG(INFO, "report tx commit result to local scheduler succeed", K(status), KP(this)); TRANS_LOG(INFO, "report tx commit result to local scheduler succeed", K(status), KP(this));
#endif #endif
} }
} else { has_skip = true; } } else if (commit_cb_.get_cb_ret() == status) {
has_skip = true;
} else { } else {
// maybe has been callbacked due to switch to follower
// the callback status is not final commit status:
// either OB_NOT_MASTER or OB_SWITCH_TO_FOLLOWER_GRACEFULLY
// in these case should callback the scheduler with final status again
use_rpc = true;
}
}
if (use_rpc) {
ObTxCommitRespMsg msg; ObTxCommitRespMsg msg;
build_tx_common_msg_(SCHEDULER_LS, msg); build_tx_common_msg_(SCHEDULER_LS, msg);
msg.commit_version_ = commit_version; msg.commit_version_ = commit_version;

View File

@ -1953,9 +1953,11 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa
n1->add_drop_msg_type(TX_2PC_PREPARE_RESP); n1->add_drop_msg_type(TX_2PC_PREPARE_RESP);
int commit_ret = OB_SUCCESS; int commit_ret = OB_SUCCESS;
// async start commit
std::thread t(do_async_commit, n1, std::ref(tx), std::ref(commit_ret)); std::thread t(do_async_commit, n1, std::ref(tx), std::ref(commit_ret));
usleep(100 * 1000); usleep(100 * 1000);
// wait coordinator into prepare state
ObPartTransCtx *n1_ctx = NULL; ObPartTransCtx *n1_ctx = NULL;
ASSERT_EQ(OB_SUCCESS, n1->get_tx_ctx(n1->ls_id_, tx.tx_id_, n1_ctx)); ASSERT_EQ(OB_SUCCESS, n1->get_tx_ctx(n1->ls_id_, tx.tx_id_, n1_ctx));
int i = 0; int i = 0;
@ -1965,28 +1967,27 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa
ASSERT_NE(i, 1001); ASSERT_NE(i, 1001);
ASSERT_EQ(OB_SUCCESS, n1->revert_tx_ctx(n1_ctx)); ASSERT_EQ(OB_SUCCESS, n1->revert_tx_ctx(n1_ctx));
// switch coordinator to follower forcedly
ObLSTxCtxMgr *ls_tx_ctx_mgr1 = NULL; ObLSTxCtxMgr *ls_tx_ctx_mgr1 = NULL;
ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr1)); ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr1));
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr1->switch_to_follower_forcedly()); ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr1->switch_to_follower_forcedly());
n1->wait_all_redolog_applied(); n1->wait_all_redolog_applied();
// n3 takeover as leader
ReplayLogEntryFunctor functor(n3); ReplayLogEntryFunctor functor(n3);
ASSERT_EQ(OB_SUCCESS, n3->fake_tx_log_adapter_->replay_all(functor)); ASSERT_EQ(OB_SUCCESS, n3->fake_tx_log_adapter_->replay_all(functor));
ObLSTxCtxMgr *ls_tx_ctx_mgr3 = NULL; ObLSTxCtxMgr *ls_tx_ctx_mgr3 = NULL;
ASSERT_EQ(OB_SUCCESS, n3->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n3->ls_id_, ls_tx_ctx_mgr3)); ASSERT_EQ(OB_SUCCESS, n3->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n3->ls_id_, ls_tx_ctx_mgr3));
ObTxNode::get_location_adapter_().update_localtion(n3->ls_id_, n3->addr_); ObTxNode::get_location_adapter_().update_localtion(n3->ls_id_, n3->addr_);
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr3->switch_to_leader()); ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr3->switch_to_leader());
n3->wait_all_redolog_applied(); n3->wait_all_redolog_applied();
ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed()); ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed());
// wait commit complete on scheduler
t.join(); t.join();
ASSERT_EQ(OB_SUCCESS, commit_ret); ASSERT_EQ(OB_SUCCESS, commit_ret);
n3->del_drop_msg_type(TX_2PC_CLEAR_REQ);
ASSERT_EQ(OB_SUCCESS, n3->wait_all_tx_ctx_is_destoryed()); ASSERT_EQ(OB_SUCCESS, n3->wait_all_tx_ctx_is_destoryed());
ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx)); ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx));

View File

@ -221,13 +221,14 @@ private:
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr)); OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr));
int i = 0; int i = 0;
for (i = 0; i < 2000; ++i) { int tx_count = ls_tx_ctx_mgr->get_tx_ctx_count();
if (0 == ls_tx_ctx_mgr->get_tx_ctx_count()) break; for (i = 0; tx_count > 0 && i < 2000; ++i) {
tx_count = ls_tx_ctx_mgr->get_tx_ctx_count();
usleep(500); usleep(500);
} }
if (2000 == i) { if (2000 == i) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_INFO("print all tx begin", K(ret)); LOG_INFO("wait all tx ctx destoryed fail, print all tx:", K(tx_count));
const bool verbose = true; const bool verbose = true;
ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose); ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose);
LOG_INFO("print all tx end", K(ret)); LOG_INFO("print all tx end", K(ret));