Bug Fix:archive checkpoint info is overwritten in the scenario of concurrently writing file
This commit is contained in:
@ -32,7 +32,7 @@ using namespace obrpc;
|
||||
* ------------------------------ObArchiveSchedulerService---------------------
|
||||
*/
|
||||
ObArchiveSchedulerService::ObArchiveSchedulerService()
|
||||
: is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), rpc_proxy_(nullptr), sql_proxy_(nullptr), schema_service_(nullptr)
|
||||
: is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), rpc_proxy_(nullptr), sql_proxy_(nullptr), schema_service_(nullptr), switch_leader_timestamp_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -288,6 +288,10 @@ int ObArchiveSchedulerService::process_()
|
||||
const uint64_t tenant_id = tenant_id_;
|
||||
if (OB_FAIL(tenant_scheduler.init(tenant_id, schema_service_, *rpc_proxy_, *sql_proxy_))) {
|
||||
LOG_WARN("failed to init tenant archive scheduler", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(can_do_schedule_())) {
|
||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
|
||||
LOG_WARN("failed to determine if scheduling is possible", K(ret), K(tenant_id));
|
||||
}
|
||||
} else if (OB_TMP_FAIL(tenant_scheduler.checkpoint())) {
|
||||
LOG_WARN("failed to checkpoint", K(tmp_ret), K(tenant_id));
|
||||
}
|
||||
@ -527,3 +531,63 @@ int ObArchiveSchedulerService::close_tenant_archive_mode_(const uint64_t tenant_
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::can_do_schedule_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObArchivePersistHelper archive_table_op;
|
||||
ObArray<ObTenantArchiveRoundAttr> rounds;
|
||||
const int64_t current_timestamp = ObTimeUtility::current_time();
|
||||
int64_t write_checkpoint_wait_interval = 300 * 1000 * 1000; // 5 min
|
||||
#ifdef ERRSIM
|
||||
int64_t tmp = 60 * 1000 * 1000; // 1 min
|
||||
if (tmp != write_checkpoint_wait_interval) {
|
||||
write_checkpoint_wait_interval = tmp;
|
||||
}
|
||||
#endif
|
||||
if (OB_FAIL(archive_table_op.init(tenant_id_))) {
|
||||
LOG_WARN("failed to init archive table operator", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(archive_table_op.get_all_active_rounds(*sql_proxy_, rounds))) {
|
||||
LOG_WARN("failed to get all active rounds", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(check_leader())) {
|
||||
LOG_WARN("follower is not allowed to work", K(ret), K(tenant_id_));
|
||||
} else if (rounds.size() <= 0) {
|
||||
} else {
|
||||
if (rounds.at(0).state_.is_prepare()) {
|
||||
switch_leader_timestamp_ = 0;
|
||||
LOG_INFO("archive state is prepare, can not prohibit schedule", K(ret), K(rounds), K(switch_leader_timestamp_));
|
||||
}
|
||||
|
||||
if (current_timestamp - switch_leader_timestamp_ < write_checkpoint_wait_interval) { // New leader takes over and needs to wait for 5 minutes before they can start working.
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
|
||||
LOG_INFO("can not do schedule", K(ret), K(current_timestamp), K(switch_leader_timestamp_), K(write_checkpoint_wait_interval));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::switch_to_leader()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObBackupBaseService::switch_to_leader())) {
|
||||
LOG_WARN("backup base service switch to leader fail", K(ret), K(tenant_id_));
|
||||
} else {
|
||||
switch_leader_timestamp_ = ObTimeUtility::current_time();
|
||||
LOG_INFO("archive scheduler service switch to leader", K(ret), K(switch_leader_timestamp_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::resume_leader()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObBackupBaseService::resume_leader())) {
|
||||
LOG_WARN("backup base service resume leader fail", K(ret), K(tenant_id_));
|
||||
} else {
|
||||
switch_leader_timestamp_ = ObTimeUtility::current_time();
|
||||
LOG_INFO("archive scheduler service resume leader", K(ret), K(switch_leader_timestamp_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user