Fix archive suspend hang
This commit is contained in:
@ -367,7 +367,9 @@ int ObArchiveFetcher::handle_log_fetch_task_(ObArchiveLogFetchTask &task)
|
|||||||
|
|
||||||
DEBUG_SYNC(BEFORE_ARCHIVE_FETCH_LOG);
|
DEBUG_SYNC(BEFORE_ARCHIVE_FETCH_LOG);
|
||||||
|
|
||||||
if (! in_normal_status_(key)) {
|
// Only handle task in archive doing status
|
||||||
|
// Status includes: doing / suspend / interrupt / stop
|
||||||
|
if (! in_doing_status_(key)) {
|
||||||
// skip
|
// skip
|
||||||
} else if (OB_UNLIKELY(! task.is_valid())) {
|
} else if (OB_UNLIKELY(! task.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
@ -898,6 +900,11 @@ bool ObArchiveFetcher::is_retry_ret_(const int ret_code) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool ObArchiveFetcher::in_normal_status_(const ArchiveKey &key) const
|
bool ObArchiveFetcher::in_normal_status_(const ArchiveKey &key) const
|
||||||
|
{
|
||||||
|
return round_mgr_->is_in_archive_status(key) || round_mgr_->is_in_suspend_status(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ObArchiveFetcher::in_doing_status_(const ArchiveKey &key) const
|
||||||
{
|
{
|
||||||
return round_mgr_->is_in_archive_status(key);
|
return round_mgr_->is_in_archive_status(key);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -224,6 +224,8 @@ private:
|
|||||||
|
|
||||||
bool in_normal_status_(const ArchiveKey &key) const;
|
bool in_normal_status_(const ArchiveKey &key) const;
|
||||||
|
|
||||||
|
bool in_doing_status_(const ArchiveKey &key) const;
|
||||||
|
|
||||||
void statistic(const int64_t log_size, const int64_t ts);
|
void statistic(const int64_t log_size, const int64_t ts);
|
||||||
private:
|
private:
|
||||||
class TmpMemoryHelper
|
class TmpMemoryHelper
|
||||||
|
|||||||
@ -199,6 +199,12 @@ bool ObArchiveRoundMgr::is_in_archive_status(const ArchiveKey &key) const
|
|||||||
return key == key_ && log_archive_state_.is_doing();
|
return key == key_ && log_archive_state_.is_doing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObArchiveRoundMgr::is_in_suspend_status(const ArchiveKey &key) const
|
||||||
|
{
|
||||||
|
RLockGuard guard(rwlock_);
|
||||||
|
return key == key_ && log_archive_state_.is_suspend();
|
||||||
|
}
|
||||||
|
|
||||||
bool ObArchiveRoundMgr::is_in_archive_stopping_status(const ArchiveKey &key) const
|
bool ObArchiveRoundMgr::is_in_archive_stopping_status(const ArchiveKey &key) const
|
||||||
{
|
{
|
||||||
RLockGuard guard(rwlock_);
|
RLockGuard guard(rwlock_);
|
||||||
|
|||||||
@ -59,6 +59,7 @@ public:
|
|||||||
int get_archive_start_scn(const ArchiveKey &key, share::SCN &scn);
|
int get_archive_start_scn(const ArchiveKey &key, share::SCN &scn);
|
||||||
void get_archive_round_compatible(ArchiveKey &key, bool &compatible);
|
void get_archive_round_compatible(ArchiveKey &key, bool &compatible);
|
||||||
bool is_in_archive_status(const ArchiveKey &key) const;
|
bool is_in_archive_status(const ArchiveKey &key) const;
|
||||||
|
bool is_in_suspend_status(const ArchiveKey &key) const;
|
||||||
bool is_in_archive_stopping_status(const ArchiveKey &key) const;
|
bool is_in_archive_stopping_status(const ArchiveKey &key) const;
|
||||||
bool is_in_archive_stop_status(const ArchiveKey &key) const;
|
bool is_in_archive_stop_status(const ArchiveKey &key) const;
|
||||||
void update_log_archive_status(const ObArchiveRoundState::Status status);
|
void update_log_archive_status(const ObArchiveRoundState::Status status);
|
||||||
|
|||||||
@ -421,7 +421,7 @@ int ObArchiveSender::do_free_send_task_()
|
|||||||
|
|
||||||
bool ObArchiveSender::in_normal_status_(const ArchiveKey &key) const
|
bool ObArchiveSender::in_normal_status_(const ArchiveKey &key) const
|
||||||
{
|
{
|
||||||
return round_mgr_->is_in_archive_status(key);
|
return round_mgr_->is_in_archive_status(key) || round_mgr_->is_in_suspend_status(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 仅有需要重试的任务返回错误码
|
// 仅有需要重试的任务返回错误码
|
||||||
@ -436,6 +436,8 @@ void ObArchiveSender::handle(ObArchiveSendTask &task, TaskConsumeStatus &consume
|
|||||||
ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(task));
|
ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(task));
|
||||||
} else if (OB_UNLIKELY(! in_normal_status_(station.get_round()))) {
|
} else if (OB_UNLIKELY(! in_normal_status_(station.get_round()))) {
|
||||||
// not in normal status, just skip
|
// not in normal status, just skip
|
||||||
|
// normal status include DOING / SUSPEND
|
||||||
|
// other status include INTERRUPT / STOP
|
||||||
consume_status = TaskConsumeStatus::STALE_TASK;
|
consume_status = TaskConsumeStatus::STALE_TASK;
|
||||||
} else if (OB_FAIL(round_mgr_->get_backup_dest(station.get_round(), backup_dest))) {
|
} else if (OB_FAIL(round_mgr_->get_backup_dest(station.get_round(), backup_dest))) {
|
||||||
ARCHIVE_LOG(WARN, "get backup dest failed", K(ret), K(task));
|
ARCHIVE_LOG(WARN, "get backup dest failed", K(ret), K(task));
|
||||||
|
|||||||
Reference in New Issue
Block a user