add ls offline check for read by transfer_dest ls
This commit is contained in:
@ -25,6 +25,7 @@
|
||||
#include "storage/tx/ob_tx_retain_ctx_mgr.h"
|
||||
#include "logservice/ob_log_base_header.h"
|
||||
#include "share/scn.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -160,6 +161,30 @@ int ObLSTxService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxService::start_request_for_transfer()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_));
|
||||
} else {
|
||||
mgr_->inc_total_request_by_transfer_dest();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxService::end_request_for_transfer()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(trans_service_) || OB_ISNULL(mgr_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret), KP(trans_service_), KP(mgr_));
|
||||
} else {
|
||||
mgr_->dec_total_request_by_transfer_dest();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxService::get_read_store_ctx(const SCN &snapshot,
|
||||
const int64_t lock_timeout,
|
||||
ObStoreCtx &store_ctx) const
|
||||
@ -221,6 +246,17 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const
|
||||
(void)mgr_->end_readonly_request();
|
||||
}
|
||||
}
|
||||
ObTxTableGuard src_tx_table_guard = store_ctx.mvcc_acc_ctx_.get_tx_table_guards().src_tx_table_guard_;
|
||||
if (src_tx_table_guard.is_valid()) {
|
||||
ObLSHandle ls_handle;
|
||||
// do not overrite ret
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(MTL(ObLSService*)->get_ls(src_tx_table_guard.get_ls_id(), ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
TRANS_LOG(ERROR, "get_ls failed", KR(tmp_ret), K(src_tx_table_guard));
|
||||
} else if (OB_TMP_FAIL(ls_handle.get_ls()->get_tx_svr()->end_request_for_transfer())) {
|
||||
TRANS_LOG(ERROR, "end request for transfer", KR(tmp_ret), K(src_tx_table_guard));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -257,6 +293,7 @@ int ObLSTxService::check_all_readonly_tx_clean_up() const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t active_readonly_request_count = 0;
|
||||
int64_t total_request_by_transfer_dest = 0;
|
||||
if (OB_ISNULL(mgr_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
|
||||
@ -266,6 +303,11 @@ int ObLSTxService::check_all_readonly_tx_clean_up() const
|
||||
mgr_->dump_readonly_request(3);
|
||||
}
|
||||
ret = OB_EAGAIN;
|
||||
} else if ((total_request_by_transfer_dest = mgr_->get_total_request_by_transfer_dest()) > 0) {
|
||||
if (REACH_TIME_INTERVAL(5000000)) {
|
||||
TRANS_LOG(INFO, "readonly requests are active", K(total_request_by_transfer_dest));
|
||||
}
|
||||
ret = OB_EAGAIN;
|
||||
} else {
|
||||
TRANS_LOG(INFO, "wait_all_readonly_tx_cleaned_up cleaned up success", K_(ls_id));
|
||||
}
|
||||
@ -710,11 +752,17 @@ int ObLSTxService::prepare_offline(const int64_t start_ts)
|
||||
// dont care readonly request
|
||||
} else {
|
||||
const int64_t readonly_request_cnt = mgr_->get_total_active_readonly_request_count();
|
||||
const int64_t request_by_transfer_dest = mgr_->get_total_request_by_transfer_dest();
|
||||
if (readonly_request_cnt > 0) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {
|
||||
TRANS_LOG(WARN, "readonly requests are active", K(ret), KP(mgr_), K_(ls_id), K(readonly_request_cnt));
|
||||
}
|
||||
} else if (request_by_transfer_dest > 0) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) {
|
||||
TRANS_LOG(WARN, "request by transfer_dest", K(ret), KP(mgr_), K_(ls_id), K(request_by_transfer_dest));
|
||||
}
|
||||
}
|
||||
}
|
||||
TRANS_LOG(INFO, "prepare offline ls", K(ret), K(start_ts), KP(mgr_), K_(ls_id));
|
||||
|
||||
@ -192,6 +192,8 @@ public:
|
||||
ObIArray<ObTxCtxMoveArg> &args);
|
||||
int move_tx_op(const ObTransferMoveTxParam &move_tx_param,
|
||||
const ObIArray<ObTxCtxMoveArg> &arg);
|
||||
int start_request_for_transfer();
|
||||
int end_request_for_transfer();
|
||||
public:
|
||||
transaction::ObTransService *get_trans_service() { return trans_service_; }
|
||||
|
||||
|
||||
@ -222,6 +222,7 @@ void ObLSTxCtxMgr::reset()
|
||||
total_tx_ctx_count_ = 0;
|
||||
active_tx_count_ = 0;
|
||||
total_active_readonly_request_count_ = 0;
|
||||
total_request_by_transfer_dest_ = 0;
|
||||
leader_takeover_ts_.reset();
|
||||
max_replay_commit_version_.reset();
|
||||
aggre_rec_scn_.reset();
|
||||
|
||||
@ -403,6 +403,11 @@ public:
|
||||
}
|
||||
int64_t get_total_active_readonly_request_count() { return ATOMIC_LOAD(&total_active_readonly_request_count_); }
|
||||
|
||||
void inc_total_request_by_transfer_dest() { (void)ATOMIC_AAF(&total_request_by_transfer_dest_, 1); }
|
||||
void dec_total_request_by_transfer_dest() { (void)ATOMIC_AAF(&total_request_by_transfer_dest_, -1); }
|
||||
int64_t get_total_request_by_transfer_dest() {
|
||||
return total_request_by_transfer_dest_;
|
||||
}
|
||||
// Get all tx obj lock information in this ObLSTxCtxMgr
|
||||
// @param [out] iter: all tx obj lock op information
|
||||
int iterate_tx_obj_lock_op(ObLockOpIterator &iter);
|
||||
@ -855,6 +860,9 @@ private:
|
||||
|
||||
int64_t active_tx_count_;
|
||||
|
||||
// for transfer dest_ls depend src_ls
|
||||
int64_t total_request_by_transfer_dest_;
|
||||
|
||||
// It is used to record the time point of leader takeover
|
||||
// gts must be refreshed to the newest before the leader provides services
|
||||
MonotonicTs leader_takeover_ts_;
|
||||
|
||||
@ -414,6 +414,8 @@ int ObAccessService::get_source_ls_tx_table_guard_(
|
||||
} else if (!user_data.transfer_scn_.is_valid() || !src_tx_table_guard.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("transfer_scn or source ls tx_table_guard is invalid", K(ret), K(src_tx_table_guard), K(user_data));
|
||||
} else if (OB_FAIL(src_ls->get_tx_svr()->start_request_for_transfer())) {
|
||||
LOG_WARN("start request for transfer failed", KR(ret), K(user_data));
|
||||
} else {
|
||||
ObStoreCtx &ctx = ctx_guard.get_store_ctx();
|
||||
ctx.mvcc_acc_ctx_.set_src_tx_table_guard(src_tx_table_guard);
|
||||
|
||||
Reference in New Issue
Block a user