fix switch to follower forcedly will failed due to no memory
This commit is contained in:
parent
17034202b9
commit
bbc7b62785
@ -214,6 +214,19 @@ int ObTransCtx::defer_callback_scheduler_(const int retcode, const SCN &commit_v
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransCtx::prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(commit_cb_.init(trans_service_, trans_id_, cb_ret, share::SCN()))) {
|
||||
TRANS_LOG(WARN, "init commit cb fail", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(commit_cb_.link(this, cb_list))) {
|
||||
TRANS_LOG(WARN, "link commit cb fail", K(ret), KPC(this));
|
||||
} else {
|
||||
cb_list = &commit_cb_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTransCtx::generate_request_id_()
|
||||
{
|
||||
const int64_t request_id = ObClockGenerator::getClock();
|
||||
@ -337,5 +350,10 @@ int ObTransCtx::set_app_trace_id_(const ObString &app_trace_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTransCtx::release_ctx_ref()
|
||||
{
|
||||
ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this);
|
||||
}
|
||||
|
||||
} // transaction
|
||||
} // oceanbase
|
||||
|
@ -161,14 +161,14 @@ public:
|
||||
const ObAddr &get_addr() const { return addr_; }
|
||||
virtual int64_t get_part_trans_action() const { return part_trans_action_; }
|
||||
int acquire_ctx_ref() { return acquire_ctx_ref_(); }
|
||||
|
||||
void release_ctx_ref();
|
||||
ObITransRpc *get_trans_rpc() const { return rpc_; }
|
||||
public:
|
||||
virtual bool is_inited() const = 0;
|
||||
virtual int handle_timeout(const int64_t delay) = 0;
|
||||
virtual int kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array) = 0;
|
||||
virtual int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list) = 0;
|
||||
// thread unsafe
|
||||
VIRTUAL_TO_STRING_KV(KP(this),
|
||||
VIRTUAL_TO_STRING_KV(KP(this), K_(ref),
|
||||
K_(trans_id),
|
||||
K_(tenant_id),
|
||||
K_(is_exiting),
|
||||
@ -189,6 +189,7 @@ protected:
|
||||
ObITsMgr *get_ts_mgr_();
|
||||
bool has_callback_scheduler_();
|
||||
int defer_callback_scheduler_(const int ret, const share::SCN &commit_version);
|
||||
int prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list);
|
||||
int64_t get_remaining_wait_interval_us_()
|
||||
{
|
||||
return trans_need_wait_wrap_.get_remaining_wait_interval_us();
|
||||
@ -210,7 +211,6 @@ protected:
|
||||
ObLightHashLink::dec_ref(1);
|
||||
TRANS_LOG(DEBUG, "dec tx ctx ref", KPC(this));
|
||||
}
|
||||
|
||||
virtual int register_timeout_task_(const int64_t interval_us);
|
||||
virtual int unregister_timeout_task_();
|
||||
void generate_request_id_();
|
||||
|
@ -246,11 +246,13 @@ int ObLSTxCtxMgr::offline()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxCtxMgr::process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const
|
||||
int ObLSTxCtxMgr::process_callback_(ObTxCommitCallback *&cb_list) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int64_t i = 0; i < cb_array.count(); i++) {
|
||||
cb_array.at(i).callback();
|
||||
ObTxCommitCallback *next = NULL;
|
||||
for (ObTxCommitCallback *iter = cb_list; iter != NULL; iter = next) {
|
||||
ObTxCommitCallback *next = iter->get_link_next();
|
||||
iter->callback();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -727,7 +729,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeGuard timeguard("ObLSTxCtxMgr::switch_to_follower_forcedly");
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
ObTxCommitCallback *cb_list = NULL;
|
||||
{
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
@ -736,13 +738,10 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_follower_()) {
|
||||
// already follower, do nothing
|
||||
} else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) {
|
||||
TRANS_LOG(ERROR, "reserve callback array error", KR(ret));
|
||||
ret = OB_EAGAIN;
|
||||
} else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) {
|
||||
TRANS_LOG(ERROR, "switch state error", KR(ret), "manager", *this);
|
||||
} else {
|
||||
SwitchToFollowerForcedlyFunctor fn(cb_array);
|
||||
SwitchToFollowerForcedlyFunctor fn(cb_list);
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
TRANS_LOG(ERROR, "for each transaction context error", KR(ret), "manager", *this);
|
||||
} else {
|
||||
@ -756,7 +755,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly()
|
||||
}
|
||||
timeguard.click();
|
||||
// run callback out of lock, ignore ret
|
||||
(void)process_callback_(cb_array);
|
||||
(void)process_callback_(cb_list);
|
||||
if (timeguard.get_diff() > 3 * 1000000) {
|
||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "switch_to_follower_forcedly use too much time", K(timeguard), "manager", *this);
|
||||
}
|
||||
@ -838,7 +837,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
|
||||
}
|
||||
timeguard.click();
|
||||
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
ObTxCommitCallback *cb_list = NULL;
|
||||
{
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
timeguard.click();
|
||||
@ -850,14 +849,11 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
|
||||
TRANS_LOG(WARN, "not init", KR(ret), K(ls_id_));
|
||||
} else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) {
|
||||
TRANS_LOG(WARN, "switch state error", KR(ret), K(tenant_id_), K(ls_id_), K(state_));
|
||||
} else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) {
|
||||
TRANS_LOG(ERROR, "reserve callback array error", KR(ret));
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
timeguard.click();
|
||||
// TODO
|
||||
const int64_t abs_expired_time = INT64_MAX;
|
||||
SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_array);
|
||||
SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_list);
|
||||
if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
TRANS_LOG(WARN, "for each tx ctx error", KR(ret), "manager", *this);
|
||||
ret = fn.get_ret();
|
||||
@ -882,7 +878,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully()
|
||||
timeguard.click();
|
||||
}
|
||||
}
|
||||
(void)process_callback_(cb_array);
|
||||
(void)process_callback_(cb_list);
|
||||
timeguard.click();
|
||||
TRANS_LOG(INFO, "[LsTxCtxMgr] switch_to_follower_gracefully", K(ret), KPC(this), K(process_count));
|
||||
if (timeguard.get_diff() > 1000000) {
|
||||
@ -941,7 +937,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
StateHelper state_helper(ls_id_, state_);
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
ObTxCommitCallback *cb_list = NULL;
|
||||
const KillTransArg arg(graceful);
|
||||
ObTimeGuard timeguard("ctxmgr stop");
|
||||
{
|
||||
@ -961,7 +957,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
KillTxCtxFunctor fn(arg, cb_array);
|
||||
KillTxCtxFunctor fn(arg, cb_list);
|
||||
fn.set_release_audit_mgr_lock(true);
|
||||
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
|
||||
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
|
||||
@ -977,7 +973,7 @@ int ObLSTxCtxMgr::stop(const bool graceful)
|
||||
if (timeguard.get_diff() > 3 * 1000000) {
|
||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "stop trans use too much time", K(timeguard), "manager", *this);
|
||||
}
|
||||
process_callback_(cb_array);
|
||||
process_callback_(cb_list);
|
||||
TRANS_LOG(INFO, "[LsTxCtxMgr] stop done", K(timeguard), "manager", *this);
|
||||
return ret;
|
||||
}
|
||||
@ -986,12 +982,12 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeGuard timeguard("ctxmgr kill_all_tx");
|
||||
ObSEArray<ObTxCommitCallback, 4> cb_array;
|
||||
ObTxCommitCallback *cb_list = NULL;
|
||||
const KillTransArg arg(graceful);
|
||||
{
|
||||
WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US);
|
||||
const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count();
|
||||
KillTxCtxFunctor fn(arg, cb_array);
|
||||
KillTxCtxFunctor fn(arg, cb_list);
|
||||
if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) {
|
||||
TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret));
|
||||
} else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
@ -1002,7 +998,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up)
|
||||
if (timeguard.get_diff() > 3 * 1000000) {
|
||||
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "kill_all_tx use too much time", K(timeguard), "manager", *this);
|
||||
}
|
||||
(void)process_callback_(cb_array);
|
||||
(void)process_callback_(cb_list);
|
||||
TRANS_LOG(INFO, "[LsTxCtxMgr] kill_all_tx done", K(timeguard), "manager", *this);
|
||||
return ret;
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ private:
|
||||
static const int64_t RETRY_INTERVAL_US = 10 *1000;
|
||||
|
||||
private:
|
||||
int process_callback_(ObIArray<ObTxCommitCallback> &cb_array) const;
|
||||
int process_callback_(ObTxCommitCallback *&cb_list) const;
|
||||
void print_all_tx_ctx_(const int64_t max_print, const bool verbose);
|
||||
int64_t get_tx_ctx_count_() const { return ATOMIC_LOAD(&total_tx_ctx_count_); }
|
||||
int create_tx_ctx_(const ObTxCreateArg &arg,
|
||||
|
@ -27,10 +27,35 @@ void ObTxCommitCallback::reset()
|
||||
enable_ = false;
|
||||
inited_ = false;
|
||||
callback_count_ = 0;
|
||||
if (linked_) {
|
||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "should not be linked", KP(tx_ctx_), K(tx_id_), K(ret_));
|
||||
if (tx_ctx_ && tx_ctx_->get_ref() > 0) {
|
||||
tx_ctx_->release_ctx_ref();
|
||||
}
|
||||
linked_ = false;
|
||||
}
|
||||
tx_ctx_ = NULL;
|
||||
txs_ = NULL;
|
||||
tx_id_.reset();
|
||||
ret_ = OB_ERR_UNEXPECTED;
|
||||
commit_version_.reset();
|
||||
link_next_ = NULL;
|
||||
}
|
||||
|
||||
int ObTxCommitCallback::link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
TRANS_LOG(DEBUG, "", KPC(tx_ctx), KP(link_next));
|
||||
if (linked_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "already linked", KPC(this), KPC(tx_ctx), KP(link_next));
|
||||
} else {
|
||||
tx_ctx->acquire_ctx_ref();
|
||||
tx_ctx_ = tx_ctx;
|
||||
link_next_ = link_next;
|
||||
linked_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCommitCallback::callback()
|
||||
@ -46,6 +71,16 @@ int ObTxCommitCallback::callback()
|
||||
++callback_count_;
|
||||
txs_->handle_tx_commit_result(tx_id_, ret_, commit_version_);
|
||||
}
|
||||
if (linked_) {
|
||||
TRANS_LOG(DEBUG, "linked commit cb", KPC(tx_ctx_), K(ret_));
|
||||
if (OB_ISNULL(tx_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx ctx should not be null for linked commit cb", K(ret), KPC(this));
|
||||
} else {
|
||||
tx_ctx_->release_ctx_ref();
|
||||
}
|
||||
linked_ = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -26,33 +26,74 @@ class ObTransService;
|
||||
struct ObTxCommitCallback
|
||||
{
|
||||
public:
|
||||
ObTxCommitCallback() { reset(); }
|
||||
ObTxCommitCallback() :
|
||||
enable_(false),
|
||||
inited_(false),
|
||||
linked_(false),
|
||||
callback_count_(0),
|
||||
txs_(NULL),
|
||||
tx_ctx_(NULL),
|
||||
tx_id_(),
|
||||
ret_(OB_ERR_UNEXPECTED),
|
||||
commit_version_(),
|
||||
link_next_(NULL)
|
||||
{}
|
||||
~ObTxCommitCallback() { reset(); }
|
||||
int init(ObTransService* txs, const ObTransID tx_id, const int ret, const share::SCN commit_version)
|
||||
int init(ObTransService* txs, const ObTransID tx_id, const int cb_ret, const share::SCN commit_version)
|
||||
{
|
||||
txs_ = txs;
|
||||
tx_id_ = tx_id;
|
||||
ret_ = ret;
|
||||
commit_version_ = commit_version;
|
||||
inited_ = true;
|
||||
return OB_SUCCESS;
|
||||
int ret = OB_SUCCESS;
|
||||
if (inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
} else {
|
||||
txs_ = txs;
|
||||
tx_id_ = tx_id;
|
||||
ret_ = cb_ret;
|
||||
commit_version_ = commit_version;
|
||||
inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next);
|
||||
bool is_inited() { return inited_; }
|
||||
void disable() { enable_ = false; }
|
||||
void enable() { enable_ = true; }
|
||||
bool is_enabled() { return enable_; }
|
||||
int get_cb_ret() const { return ret_; }
|
||||
ObTxCommitCallback *get_link_next() const { return link_next_; }
|
||||
void reset();
|
||||
void destroy() { reset(); }
|
||||
int callback();
|
||||
TO_STRING_KV(K_(inited), K_(enable), KP_(txs), K_(tx_id), K_(ret), K_(commit_version), K_(callback_count));
|
||||
ObTxCommitCallback &operator=(const ObTxCommitCallback &right)
|
||||
{
|
||||
enable_ = right.enable_;
|
||||
inited_ = right.inited_;
|
||||
callback_count_ = right.callback_count_;
|
||||
txs_ = right.txs_;
|
||||
tx_id_ = right.tx_id_;
|
||||
ret_ = right.ret_;
|
||||
return *this;
|
||||
}
|
||||
TO_STRING_KV(K_(inited),
|
||||
K_(enable),
|
||||
K_(linked),
|
||||
KP_(txs),
|
||||
KP_(tx_ctx),
|
||||
K_(tx_id),
|
||||
K_(ret),
|
||||
K_(commit_version),
|
||||
K_(callback_count),
|
||||
KP_(link_next));
|
||||
public:
|
||||
bool enable_;
|
||||
bool inited_;
|
||||
bool linked_;
|
||||
int64_t callback_count_;
|
||||
ObTransService* txs_;
|
||||
ObTransCtx *tx_ctx_;
|
||||
ObTransID tx_id_;
|
||||
int ret_;
|
||||
share::SCN commit_version_;
|
||||
ObTxCommitCallback *link_next_;
|
||||
};
|
||||
|
||||
class ObTxCommitCallbackTask : public ObTransTask
|
||||
|
@ -160,7 +160,7 @@ private:
|
||||
class SwitchToFollowerForcedlyFunctor
|
||||
{
|
||||
public:
|
||||
SwitchToFollowerForcedlyFunctor(ObIArray<ObTxCommitCallback> &cb_array) : cb_array_(cb_array)
|
||||
SwitchToFollowerForcedlyFunctor(ObTxCommitCallback *&cb_list) : cb_list_(cb_list)
|
||||
{
|
||||
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/)
|
||||
}
|
||||
@ -172,7 +172,7 @@ public:
|
||||
if (!tx_id.is_valid() || OB_ISNULL(tx_ctx)) {
|
||||
tmp_ret = common::OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx));
|
||||
} else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_array_))) {
|
||||
} else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_list_))) {
|
||||
TRANS_LOG_RET(ERROR, tmp_ret, "leader revoke failed", K(tx_id), K(*tx_ctx));
|
||||
}
|
||||
|
||||
@ -180,7 +180,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
ObIArray<ObTxCommitCallback> &cb_array_;
|
||||
ObTxCommitCallback *&cb_list_;
|
||||
};
|
||||
|
||||
class SwitchToLeaderFunctor
|
||||
@ -217,9 +217,8 @@ private:
|
||||
class SwitchToFollowerGracefullyFunctor
|
||||
{
|
||||
public:
|
||||
SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time,
|
||||
ObIArray<ObTxCommitCallback> &cb_array)
|
||||
: abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_array_(cb_array)
|
||||
SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time, ObTxCommitCallback *&cb_list)
|
||||
: abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_list_(cb_list)
|
||||
{
|
||||
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
|
||||
}
|
||||
@ -242,7 +241,7 @@ public:
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_array_))) {
|
||||
if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_list_))) {
|
||||
TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(*tx_ctx));
|
||||
ret_ = ret;
|
||||
} else {
|
||||
@ -259,7 +258,7 @@ private:
|
||||
int64_t abs_expired_time_;
|
||||
int64_t count_;
|
||||
int ret_;
|
||||
ObIArray<ObTxCommitCallback> &cb_array_;
|
||||
ObTxCommitCallback *&cb_list_;
|
||||
};
|
||||
|
||||
class ResumeLeaderFunctor
|
||||
@ -320,8 +319,8 @@ private:
|
||||
class KillTxCtxFunctor
|
||||
{
|
||||
public:
|
||||
KillTxCtxFunctor(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array)
|
||||
: arg_(arg), release_audit_mgr_lock_(false), cb_array(cb_array)
|
||||
KillTxCtxFunctor(const KillTransArg &arg, ObTxCommitCallback *&cb_list)
|
||||
: arg_(arg), release_audit_mgr_lock_(false), cb_list_(cb_list)
|
||||
{
|
||||
|
||||
SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/);
|
||||
@ -340,7 +339,7 @@ public:
|
||||
TRANS_LOG(WARN, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx));
|
||||
tmp_ret = common::OB_INVALID_ARGUMENT;
|
||||
} else {
|
||||
if (OB_SUCC(tx_ctx->kill(arg_, cb_array))) {
|
||||
if (OB_SUCC(tx_ctx->kill(arg_, cb_list_))) {
|
||||
TRANS_LOG(INFO, "kill transaction success", K(tx_id), K_(arg));
|
||||
} else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) {
|
||||
TRANS_LOG(INFO, "transaction can not be killed", K(tx_id), "context", *tx_ctx);
|
||||
@ -355,7 +354,7 @@ public:
|
||||
private:
|
||||
KillTransArg arg_;
|
||||
bool release_audit_mgr_lock_;
|
||||
ObIArray<ObTxCommitCallback> &cb_array;
|
||||
ObTxCommitCallback *&cb_list_;
|
||||
};
|
||||
|
||||
class TransferOutTxOpFunctor
|
||||
|
@ -694,7 +694,7 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array)
|
||||
int ObPartTransCtx::kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -743,12 +743,9 @@ int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &
|
||||
// notify scheduler only if commit callback has not been armed
|
||||
if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) {
|
||||
if (exec_info_.scheduler_ == addr_) {
|
||||
ObTxCommitCallback cb;
|
||||
cb.init(trans_service_, trans_id_, cb_param, SCN());
|
||||
if (OB_FAIL(cb_array.push_back(cb))) {
|
||||
TRANS_LOG(WARN, "push commit callback fail", K(ret), KPC(this));
|
||||
} else {
|
||||
commit_cb_.disable();
|
||||
if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(cb_param, cb_list))) {
|
||||
TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), K(cb_param), KPC(this));
|
||||
ret = (ret == OB_SUCCESS) ? tmp_ret : ret;
|
||||
}
|
||||
} else {
|
||||
post_tx_commit_resp_(cb_param);
|
||||
@ -5813,7 +5810,7 @@ inline bool ObPartTransCtx::need_callback_scheduler_() {
|
||||
// 3. resume_leader is called
|
||||
// 4. resume_leader fails
|
||||
// 5. revoke, and still in follower state
|
||||
int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb_array)
|
||||
int ObPartTransCtx::switch_to_follower_forcedly(ObTxCommitCallback *&cb_list_head)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObTimeGuard timeguard("switch_to_follower_forcely", 10 * 1000);
|
||||
@ -5842,10 +5839,8 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
|
||||
if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) {
|
||||
TRANS_LOG(WARN, "do local tx abort failed", K(ret));
|
||||
} else if (need_cb_scheduler) {
|
||||
ObTxCommitCallback cb;
|
||||
cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN());
|
||||
if (OB_FAIL(cb_array.push_back(cb))) {
|
||||
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
|
||||
if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_TRANS_KILLED, cb_list_head))) {
|
||||
TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && need_cb_scheduler) {
|
||||
@ -5889,18 +5884,21 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
|
||||
// special handle commit triggered by local call: coordinator colocate with scheduler
|
||||
// let scheduler retry commit with RPC if required
|
||||
if (need_callback_scheduler_()) {
|
||||
ObTxCommitCallback cb;
|
||||
int commit_ret = OB_SUCCESS;
|
||||
// no CommitInfoLog has been submitted, txn must abort
|
||||
if (exec_info_.state_ == ObTxState::INIT && !sub_state_.is_info_log_submitted()) {
|
||||
cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN());
|
||||
commit_ret = OB_TRANS_KILLED;
|
||||
} else {
|
||||
// otherwise, txn either continue commit or abort, need retry to get final result
|
||||
cb.init(trans_service_, trans_id_, OB_NOT_MASTER, SCN());
|
||||
commit_ret = OB_NOT_MASTER;
|
||||
}
|
||||
TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler",
|
||||
"commit_result", cb.ret_, KPC(this));
|
||||
if (OB_FAIL(cb_array.push_back(cb))) {
|
||||
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(commit_ret, cb_list_head))) {
|
||||
TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), KPC(this));
|
||||
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
|
||||
} else {
|
||||
TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler",
|
||||
"commit_result", commit_ret, KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5921,7 +5919,7 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &cb_array)
|
||||
int ObPartTransCtx::switch_to_follower_gracefully(ObTxCommitCallback *&cb_list_head)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_submit_log = false;
|
||||
@ -5965,11 +5963,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &
|
||||
log_type = ObTxLogType::TX_COMMIT_INFO_LOG;
|
||||
}
|
||||
if (need_callback_scheduler_()) {
|
||||
ObTxCommitCallback cb;
|
||||
cb.init(trans_service_, trans_id_, OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, SCN());
|
||||
TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this));
|
||||
if (OB_FAIL(cb_array.push_back(cb))) {
|
||||
TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this);
|
||||
if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, cb_list_head))) {
|
||||
TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this));
|
||||
} else {
|
||||
TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6025,10 +6022,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &
|
||||
OB_ID(used), timeguard.get_diff(),
|
||||
OB_ID(ref), get_ref());
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(ret), KPC(this), K(cb_array));
|
||||
TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), KPC(this));
|
||||
} else {
|
||||
#ifndef NDEBUG
|
||||
TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this), K(cb_array));
|
||||
TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this));
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ public:
|
||||
/*
|
||||
* graceful kill: wait trx finish logging
|
||||
*/
|
||||
int kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &cb_array);
|
||||
int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list);
|
||||
memtable::ObMemtableCtx *get_memtable_ctx() { return &mt_ctx_; }
|
||||
int commit(const ObTxCommitParts &parts,
|
||||
const MonotonicTs &commit_time,
|
||||
@ -298,7 +298,6 @@ private:
|
||||
"2pc_role",
|
||||
get_2pc_role(),
|
||||
K_(collected),
|
||||
K_(ref),
|
||||
K_(rec_log_ts),
|
||||
K_(prev_rec_log_ts),
|
||||
K_(lastest_snapshot),
|
||||
@ -443,9 +442,9 @@ public:
|
||||
|
||||
// leader switch related
|
||||
bool need_callback_scheduler_();
|
||||
int switch_to_follower_forcedly(ObIArray<ObTxCommitCallback> &cb_array);
|
||||
int switch_to_follower_forcedly(ObTxCommitCallback *&cb_list);
|
||||
int switch_to_leader(const share::SCN &start_working_ts);
|
||||
int switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &cb_array);
|
||||
int switch_to_follower_gracefully(ObTxCommitCallback *&cb_list);
|
||||
int resume_leader(const share::SCN &start_working_ts);
|
||||
int supplement_undo_actions_if_exist_();
|
||||
|
||||
|
@ -1104,8 +1104,17 @@ int ObPartTransCtx::post_tx_commit_resp_(const int status)
|
||||
TRANS_LOG(INFO, "report tx commit result to local scheduler succeed", K(status), KP(this));
|
||||
#endif
|
||||
}
|
||||
} else { has_skip = true; }
|
||||
} else {
|
||||
} else if (commit_cb_.get_cb_ret() == status) {
|
||||
has_skip = true;
|
||||
} else {
|
||||
// maybe has been callbacked due to switch to follower
|
||||
// the callback status is not final commit status:
|
||||
// either OB_NOT_MASTER or OB_SWITCH_TO_FOLLOWER_GRACEFULLY
|
||||
// in these case should callback the scheduler with final status again
|
||||
use_rpc = true;
|
||||
}
|
||||
}
|
||||
if (use_rpc) {
|
||||
ObTxCommitRespMsg msg;
|
||||
build_tx_common_msg_(SCHEDULER_LS, msg);
|
||||
msg.commit_version_ = commit_version;
|
||||
|
@ -1953,9 +1953,11 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa
|
||||
n1->add_drop_msg_type(TX_2PC_PREPARE_RESP);
|
||||
|
||||
int commit_ret = OB_SUCCESS;
|
||||
// async start commit
|
||||
std::thread t(do_async_commit, n1, std::ref(tx), std::ref(commit_ret));
|
||||
usleep(100 * 1000);
|
||||
|
||||
// wait coordinator into prepare state
|
||||
ObPartTransCtx *n1_ctx = NULL;
|
||||
ASSERT_EQ(OB_SUCCESS, n1->get_tx_ctx(n1->ls_id_, tx.tx_id_, n1_ctx));
|
||||
int i = 0;
|
||||
@ -1965,28 +1967,27 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa
|
||||
ASSERT_NE(i, 1001);
|
||||
ASSERT_EQ(OB_SUCCESS, n1->revert_tx_ctx(n1_ctx));
|
||||
|
||||
// switch coordinator to follower forcedly
|
||||
ObLSTxCtxMgr *ls_tx_ctx_mgr1 = NULL;
|
||||
ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr1));
|
||||
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr1->switch_to_follower_forcedly());
|
||||
n1->wait_all_redolog_applied();
|
||||
|
||||
// n3 takeover as leader
|
||||
ReplayLogEntryFunctor functor(n3);
|
||||
ASSERT_EQ(OB_SUCCESS, n3->fake_tx_log_adapter_->replay_all(functor));
|
||||
|
||||
ObLSTxCtxMgr *ls_tx_ctx_mgr3 = NULL;
|
||||
ASSERT_EQ(OB_SUCCESS, n3->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n3->ls_id_, ls_tx_ctx_mgr3));
|
||||
|
||||
ObTxNode::get_location_adapter_().update_localtion(n3->ls_id_, n3->addr_);
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr3->switch_to_leader());
|
||||
n3->wait_all_redolog_applied();
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed());
|
||||
|
||||
// wait commit complete on scheduler
|
||||
t.join();
|
||||
ASSERT_EQ(OB_SUCCESS, commit_ret);
|
||||
|
||||
n3->del_drop_msg_type(TX_2PC_CLEAR_REQ);
|
||||
ASSERT_EQ(OB_SUCCESS, n3->wait_all_tx_ctx_is_destoryed());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx));
|
||||
|
@ -221,13 +221,14 @@ private:
|
||||
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
||||
OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr));
|
||||
int i = 0;
|
||||
for (i = 0; i < 2000; ++i) {
|
||||
if (0 == ls_tx_ctx_mgr->get_tx_ctx_count()) break;
|
||||
int tx_count = ls_tx_ctx_mgr->get_tx_ctx_count();
|
||||
for (i = 0; tx_count > 0 && i < 2000; ++i) {
|
||||
tx_count = ls_tx_ctx_mgr->get_tx_ctx_count();
|
||||
usleep(500);
|
||||
}
|
||||
if (2000 == i) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_INFO("print all tx begin", K(ret));
|
||||
LOG_INFO("wait all tx ctx destoryed fail, print all tx:", K(tx_count));
|
||||
const bool verbose = true;
|
||||
ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose);
|
||||
LOG_INFO("print all tx end", K(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user