replica is not readable when ls blocked and retry cost too much time.
This commit is contained in:
@ -1571,6 +1571,19 @@ int ObLSTxCtxMgr::start_readonly_request()
|
||||
TRANS_LOG(WARN, "ObLSTxCtxMgr not inited", K(this));
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (is_all_blocked_()) {
|
||||
// blocked ls add to black list
|
||||
ObAddr server = txs_->get_server();
|
||||
ObBLKey bl_key;
|
||||
if(OB_FAIL(bl_key.init(server, tenant_id_, ls_id_))) {
|
||||
TRANS_LOG(WARN, "bl_key init fail, add block ls to black list fail", K(server), K(tenant_id_), K(ls_id_), K(ret));
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObBLService::get_instance().add(bl_key))) {
|
||||
TRANS_LOG(WARN, "add block ls to black list fail", K(bl_key), K(tmp_ret));
|
||||
} else {
|
||||
TRANS_LOG(INFO, "add block ls to black list success", K(bl_key));
|
||||
}
|
||||
}
|
||||
ret = OB_PARTITION_IS_BLOCKED;
|
||||
// readonly read must be blocked, because trx may be killed forcely
|
||||
TRANS_LOG(WARN, "logstream is blocked", K(ret));
|
||||
@ -2448,6 +2461,33 @@ int ObTxCtxMgr::check_scheduler_status(share::ObLSID ls_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
// check ls status in trans layer
|
||||
int ObTxCtxMgr::check_ls_status(const share::ObLSID &ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "ObTxCtxMgr not inited", K(ret), K(ls_id));
|
||||
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(get_ls_tx_ctx_mgr(ls_id, ls_tx_ctx_mgr))) {
|
||||
ret = OB_PARTITION_NOT_EXIST;
|
||||
TRANS_LOG(WARN, "get ls_tx_ctx_mgr failed", K(ret), K(ls_id));
|
||||
} else if(ls_tx_ctx_mgr->is_stopped()) {
|
||||
ret = OB_PARTITION_IS_BLOCKED;
|
||||
TRANS_LOG(WARN, "ls_tx_ctx_mgr is stopped", K(ret), K(ls_id));
|
||||
} else if (ls_tx_ctx_mgr->is_all_blocked()) {
|
||||
ret = OB_PARTITION_IS_BLOCKED;
|
||||
TRANS_LOG(WARN, "logstream is blocked", K(ret), K(ls_id));
|
||||
}
|
||||
if (OB_NOT_NULL(ls_tx_ctx_mgr)) {
|
||||
revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCtxMgr::do_all_ls_standby_cleanup(ObTimeGuard &cleanup_timeguard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -548,6 +548,9 @@ public:
|
||||
// check is blocked
|
||||
bool is_tx_blocked() const { return is_tx_blocked_(); }
|
||||
|
||||
// check all blocked
|
||||
bool is_all_blocked() const { return is_all_blocked_(); }
|
||||
|
||||
// Switch the prev_aggre_log_ts and aggre_log_ts during dump starts
|
||||
int refresh_aggre_rec_scn();
|
||||
|
||||
@ -1105,6 +1108,8 @@ public:
|
||||
int get_max_decided_scn(const share::ObLSID &ls_id, share::SCN & scn);
|
||||
|
||||
int do_all_ls_standby_cleanup(ObTimeGuard &cleanup_timeguard);
|
||||
|
||||
int check_ls_status(const share::ObLSID &ls_id);
|
||||
private:
|
||||
int create_ls_(const int64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
|
@ -2113,6 +2113,29 @@ int ObTransService::handle_sp_rollback_response(ObTxRollbackSPRespMsg &msg,
|
||||
result.init(ret, msg.get_timestamp());
|
||||
return ret;
|
||||
}
|
||||
|
||||
// check ls status in trans layer
|
||||
int ObTransService::check_ls_status(const share::ObLSID &ls_id){
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
TRANS_LOG(WARN, "ObTransService not inited");
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (OB_UNLIKELY(!is_running_)) {
|
||||
TRANS_LOG(WARN, "ObTransService is not running");
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else if (!ls_id.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "invalid argument", KR(ret), K(ls_id));
|
||||
} else if (OB_FAIL(tx_ctx_mgr_.check_ls_status(ls_id))) {
|
||||
TRANS_LOG(WARN, "check_ls_status error", KR(ret), K(ls_id));
|
||||
} else {
|
||||
TRANS_LOG(DEBUG, "check_ls_status success", K(ls_id));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::check_ls_status_(const share::ObLSID &ls_id, bool &leader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -189,6 +189,8 @@ int ask_tx_state_for_4377(const ObLSID ls_id,
|
||||
int handle_ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg,
|
||||
bool &is_alive);
|
||||
|
||||
int check_ls_status(const share::ObLSID &ls_id);
|
||||
|
||||
TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));
|
||||
|
||||
private:
|
||||
|
Reference in New Issue
Block a user