[BUG] quit the lock for read loop when gc handler find error

This commit is contained in:
Handora 2023-09-21 10:40:12 +00:00 committed by ob-robot
parent cd903a45b4
commit 9e888053c3
3 changed files with 63 additions and 6 deletions

View File

@ -375,6 +375,7 @@ void ObGCHandler::reset()
ls_ = NULL;
gc_start_ts_ = OB_INVALID_TIMESTAMP;
block_tx_ts_ = OB_INVALID_TIMESTAMP;
log_sync_stopped_ = false;
is_inited_ = false;
}
@ -395,6 +396,11 @@ int ObGCHandler::init(ObLS *ls)
return ret;
}
void ObGCHandler::set_log_sync_stopped()
{
ATOMIC_SET(&log_sync_stopped_, true);
CLOG_LOG(INFO, "set log_sync_stopped_ to true", K(ls_->get_ls_id()));
}
int ObGCHandler::execute_pre_remove()
{
int ret = OB_SUCCESS;
@ -533,8 +539,7 @@ int ObGCHandler::check_ls_can_offline(const share::ObLSStatus &ls_status)
return ret;
}
int ObGCHandler::gc_check_invalid_member_seq(const int64_t gc_seq,
bool &need_gc)
int ObGCHandler::gc_check_invalid_member_seq(const int64_t gc_seq, bool &need_gc)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -1621,6 +1626,22 @@ void ObGarbageCollector::execute_gc_(ObGCCandidateArray &gc_candidates)
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), K(id));
} else if (is_need_gc_ls_status_(ls_status)) {
//this replica may not be able to synchornize complete logs
if (GCReason::LS_STATUS_ENTRY_NOT_EXIST == gc_reason) {
SCN offline_scn;
if (OB_SUCCESS != (tmp_ret = (ls->get_offline_scn(offline_scn)))) {
CLOG_LOG(ERROR, "get_offline_scn failed", K(id));
} else if (!offline_scn.is_valid()) {
gc_handler->set_log_sync_stopped();
}
} else if (NOT_IN_LEADER_MEMBER_LIST == gc_reason) {
ObLogHandler *log_handler = NULL;
if (OB_ISNULL(log_handler = ls->get_log_handler())) {
CLOG_LOG(ERROR, "log_handler is NULL", K(tmp_ret), K(id));
} else if (!log_handler->is_sync_enabled() || !log_handler->is_replay_enabled()) {
gc_handler->set_log_sync_stopped();
}
}
ObSwitchLeaderAdapter switch_leader_adapter;
if (OB_SUCCESS != (tmp_ret = (gc_handler->execute_pre_remove()))) {
CLOG_LOG(WARN, "failed to execute_pre_remove", K(tmp_ret), K(id), K_(self_addr));

View File

@ -252,6 +252,8 @@ public:
int gc_check_invalid_member_seq(const int64_t gc_seq, bool &need_gc);
static bool is_valid_ls_gc_state(const LSGCState &state);
static bool is_ls_offline_gc_state(const LSGCState &state);
void set_log_sync_stopped();
bool is_log_sync_stopped() const {return ATOMIC_LOAD(&log_sync_stopped_);}
int diagnose(GCDiagnoseInfo &diagnose_info) const;
@ -346,6 +348,7 @@ private:
int64_t gc_seq_invalid_member_; //缓存gc检查当前ls不在成员列表时的轮次
int64_t gc_start_ts_;
int64_t block_tx_ts_;
bool log_sync_stopped_;//used for trans_service to kill trx, True means this replica may not be able to fully synchronize the logs.
};
} // namespace logservice

View File

@ -21,6 +21,7 @@
#include "observer/ob_server_struct.h"
#include "logservice/leader_coordinator/ob_failure_detector.h"
#include "observer/virtual_table/ob_all_virtual_tx_data.h"
#include "logservice/ob_garbage_collector.h"
namespace oceanbase
{
@ -319,12 +320,12 @@ bool LockForReadFunctor::recheck()
int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx)
{
int ret = OB_ERR_SHARED_LOCK_CONFLICT;
const int64_t MAX_RETRY_CNT = 1000;
const int64_t MAX_SLEEP_US = 1000;
ObMvccAccessCtx &acc_ctx = lock_for_read_arg_.mvcc_acc_ctx_;
int64_t lock_expire_ts = acc_ctx.eval_lock_expire_ts();
// check lock_for_read blocked or not every 1ms * 100 = 100ms
// check lock_for_read blocked or not every 1ms * 1000 = 1s
int64_t retry_cnt = 0;
const int64_t MAX_RETRY_CNT = 100;
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
@ -352,12 +353,44 @@ int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx
ob_usleep((i < MAX_SLEEP_US ? i : MAX_SLEEP_US));
}
if (retry_cnt == MAX_RETRY_CNT) {
retry_cnt = 0;
logservice::coordinator::ObFailureDetector *detector = MTL(logservice::coordinator::ObFailureDetector *);
int tmp_ret = OB_SUCCESS;
// Opt1: Check the failure detector for clog disk full
logservice::coordinator::ObFailureDetector *detector =
MTL(logservice::coordinator::ObFailureDetector *);
if (NULL != detector && detector->is_clog_disk_has_fatal_error()) {
ret = OB_IO_ERROR;
TRANS_LOG(ERROR, "unexpected io error", K(ret), K(tx_data), KPC(tx_cc_ctx), KPC(this));
}
// Opt2: Check the gc handler for log sync status
logservice::ObGCHandler *gc_handler = NULL;
ObLSService *ls_service = MTL(ObLSService *);
ObLSHandle ls_handle;
ObLS *ls = NULL;
if (OB_FAIL(ret)) {
// pass
} else if (NULL == ls_service) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "fail to get ls service", K(tmp_ret), KPC(this));
} else if (OB_TMP_FAIL(ls_service->get_ls(ls_id_,
ls_handle,
ObLSGetMod::TRANS_MOD))) {
TRANS_LOG(WARN, "fail to get ls handle", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "ls not exist", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), KPC(this));
} else if (gc_handler->is_log_sync_stopped()) {
ret = OB_REPLICA_NOT_READABLE;
TRANS_LOG(WARN, "log sync has been stopped, so we need giveup retry",
K(ret), KPC(this));
}
// reset the counter
retry_cnt = 0;
}
}
}