We should not skip the check when 6004

This commit is contained in:
Handora
2023-10-20 02:09:46 +00:00
committed by ob-robot
parent fbcc7b9e37
commit 77272424b0
2 changed files with 52 additions and 31 deletions

View File

@ -356,37 +356,11 @@ int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx
int tmp_ret = OB_SUCCESS;
// Opt1: Check the failure detector for clog disk full
logservice::coordinator::ObFailureDetector *detector =
MTL(logservice::coordinator::ObFailureDetector *);
if (NULL != detector && detector->is_clog_disk_has_fatal_error()) {
ret = OB_IO_ERROR;
TRANS_LOG(ERROR, "unexpected io error", K(ret), K(tx_data), KPC(tx_cc_ctx), KPC(this));
}
if (OB_TMP_FAIL(check_clog_disk_full_())) {
ret = tmp_ret;
// Opt2: Check the gc handler for log sync status
logservice::ObGCHandler *gc_handler = NULL;
ObLSService *ls_service = MTL(ObLSService *);
ObLSHandle ls_handle;
ObLS *ls = NULL;
if (OB_FAIL(ret)) {
// pass
} else if (NULL == ls_service) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "fail to get ls service", K(tmp_ret), KPC(this));
} else if (OB_TMP_FAIL(ls_service->get_ls(ls_id_,
ls_handle,
ObLSGetMod::TRANS_MOD))) {
TRANS_LOG(WARN, "fail to get ls handle", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "ls not exist", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
tmp_ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), KPC(this));
} else if (gc_handler->is_log_sync_stopped()) {
ret = OB_REPLICA_NOT_READABLE;
TRANS_LOG(WARN, "log sync has been stopped, so we need giveup retry",
K(ret), KPC(this));
} else if (OB_TMP_FAIL(check_gc_handler_())) {
ret = tmp_ret;
}
// reset the counter
@ -406,6 +380,52 @@ int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx
return ret;
}
int LockForReadFunctor::check_clog_disk_full_()
{
int ret = OB_SUCCESS;
logservice::coordinator::ObFailureDetector *detector =
MTL(logservice::coordinator::ObFailureDetector *);
if (NULL != detector && detector->is_clog_disk_has_fatal_error()) {
ret = OB_IO_ERROR;
TRANS_LOG(ERROR, "unexpected io error", K(ret), KPC(this));
}
return ret;
}
int LockForReadFunctor::check_gc_handler_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
logservice::ObGCHandler *gc_handler = NULL;
ObLSService *ls_service = MTL(ObLSService *);
ObLSHandle ls_handle;
ObLS *ls = NULL;
if (NULL == ls_service) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "fail to get ls service", K(tmp_ret), KPC(this));
} else if (OB_TMP_FAIL(ls_service->get_ls(ls_id_,
ls_handle,
ObLSGetMod::TRANS_MOD))) {
TRANS_LOG(WARN, "fail to get ls handle", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "ls not exist", K(tmp_ret), KPC(this));
} else if (OB_ISNULL(gc_handler = ls->get_gc_handler())) {
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "gc_handler is NULL", K(tmp_ret), KPC(this));
} else if (gc_handler->is_log_sync_stopped()) {
ret = OB_REPLICA_NOT_READABLE;
TRANS_LOG(WARN, "log sync has been stopped, so we need giveup retry",
K(ret), KPC(this));
}
return ret;
}
int LockForReadFunctor::check_for_standby(const transaction::ObTransID &tx_id)
{
int ret = OB_SUCCESS;

View File

@ -200,7 +200,8 @@ public:
K(trans_version_), K(ls_id_));
private:
int inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx);
int check_clog_disk_full_();
int check_gc_handler_();
public:
const transaction::ObLockForReadArg &lock_for_read_arg_;
bool &can_read_;