adapt scn for migration

This commit is contained in:
oceanoverflow
2022-11-28 01:59:59 +00:00
committed by ob-robot
parent fd4a35f42a
commit a4c9f3e4d4
34 changed files with 177 additions and 166 deletions

View File

@ -30,7 +30,7 @@ ObLSPrepareMigrationCtx::ObLSPrepareMigrationCtx()
task_id_(),
start_ts_(0),
finish_ts_(0),
log_sync_scn_(0)
log_sync_scn_(palf::SCN::min_scn())
{
}
@ -41,7 +41,7 @@ ObLSPrepareMigrationCtx::~ObLSPrepareMigrationCtx()
bool ObLSPrepareMigrationCtx::is_valid() const
{
return arg_.is_valid() && !task_id_.is_invalid()
&& tenant_id_ != 0 && tenant_id_ != OB_INVALID_ID && log_sync_scn_ >= 0;
&& tenant_id_ != 0 && tenant_id_ != OB_INVALID_ID && log_sync_scn_.is_valid();
}
void ObLSPrepareMigrationCtx::reset()
@ -51,7 +51,7 @@ void ObLSPrepareMigrationCtx::reset()
task_id_.reset();
start_ts_ = 0;
finish_ts_ = 0;
log_sync_scn_ = 0;
log_sync_scn_.set_min();
ObIHADagNetCtx::reset();
}
@ -78,7 +78,7 @@ int ObLSPrepareMigrationCtx::fill_comment(char *buf, const int64_t buf_len) cons
void ObLSPrepareMigrationCtx::reuse()
{
ObIHADagNetCtx::reuse();
log_sync_scn_ = 0;
log_sync_scn_.set_min();
}
/******************ObLSPrepareMigrationDagNet*********************/
@ -736,7 +736,7 @@ int ObStartPrepareMigrationTask::process()
LOG_WARN("failed to wait log replay sync", K(ret), KPC(ctx_));
} else if (OB_FAIL(remove_local_incomplete_tablets_())) {
LOG_WARN("failed to remove local incomplete tablets", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_ls_checkpoint_ts_push_())) {
} else if (OB_FAIL(wait_ls_checkpoint_scn_push_())) {
LOG_WARN("failed to wait ls checkpoint ts push", K(ret), KPC(ctx_));
} else if (OB_FAIL(generate_prepare_migration_dags_())) {
LOG_WARN("failed to generate prepare migration dags", K(ret), KPC(ctx_));
@ -798,8 +798,8 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_()
} else if (OB_FAIL(ls->get_saved_info(saved_info))) {
LOG_WARN("failed to get saved info", K(ret), KPC(ls));
} else if (!saved_info.is_empty()) {
ctx_->log_sync_scn_ = saved_info.clog_checkpoint_scn_.get_val_for_lsn_allocator();
} else if (OB_FAIL(ls->get_end_ts_ns(ctx_->log_sync_scn_))) {
ctx_->log_sync_scn_ = saved_info.clog_checkpoint_scn_;
} else if (OB_FAIL(ls->get_end_scn(ctx_->log_sync_scn_))) {
LOG_WARN("failed to get end ts ns", K(ret), KPC(ctx_));
}
return ret;
@ -813,8 +813,8 @@ int ObStartPrepareMigrationTask::wait_log_replay_sync_()
logservice::ObLogService *log_service = nullptr;
bool wait_log_replay_success = false;
bool is_cancel = false;
int64_t current_replay_log_ts_ns = 0;
int64_t last_replay_log_ts_ns = 0;
palf::SCN current_replay_scn;
palf::SCN last_replay_scn;
const int64_t OB_CHECK_LOG_SYNC_INTERVAL = 200 * 1000; // 200ms
const int64_t CLOG_IN_SYNC_DELAY_TIMEOUT = 30 * 60 * 1000 * 1000L; // 30 min
ObLSSavedInfo saved_info;
@ -845,9 +845,9 @@ int ObStartPrepareMigrationTask::wait_log_replay_sync_()
} else if (is_cancel) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "task is cancelled", K(ret), K(*this));
} else if (OB_FAIL(ls->get_max_decided_log_ts_ns(current_replay_log_ts_ns))) {
LOG_WARN("failed to get current replay log ts", K(ret), KPC(ctx_));
} else if (current_replay_log_ts_ns >= ctx_->log_sync_scn_) {
} else if (OB_FAIL(ls->get_max_decided_scn(current_replay_scn))) {
LOG_WARN("failed to get current replay log scn", K(ret), KPC(ctx_));
} else if (current_replay_scn >= ctx_->log_sync_scn_) {
wait_log_replay_success = true;
const int64_t cost_ts = ObTimeUtility::current_time() - wait_replay_start_ts;
LOG_INFO("wait replay log ts ns success, stop wait", "arg", ctx_->arg_, K(cost_ts));
@ -856,11 +856,11 @@ int ObStartPrepareMigrationTask::wait_log_replay_sync_()
bool is_timeout = false;
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
LOG_INFO("log is not sync, retry next loop", "arg", ctx_->arg_,
"current_replay_log_ts_ns", current_replay_log_ts_ns,
"current_replay_scn", current_replay_scn,
"log_sync_scn", ctx_->log_sync_scn_);
}
if (current_replay_log_ts_ns == last_replay_log_ts_ns) {
if (current_replay_scn == last_replay_scn) {
if (current_ts - last_replay_ts > CLOG_IN_SYNC_DELAY_TIMEOUT) {
is_timeout = true;
@ -873,15 +873,15 @@ int ObStartPrepareMigrationTask::wait_log_replay_sync_()
ret = OB_TIMEOUT;
STORAGE_LOG(WARN, "failed to check log replay sync. timeout, stop migration task",
K(ret), K(*ctx_), K(CLOG_IN_SYNC_DELAY_TIMEOUT), K(wait_replay_start_ts),
K(current_ts), K(current_replay_log_ts_ns));
K(current_ts), K(current_replay_scn));
}
}
} else if (last_replay_log_ts_ns > current_replay_log_ts_ns) {
} else if (last_replay_scn > current_replay_scn) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("last end log ts should not smaller than current end log ts", K(ret),
K(last_replay_log_ts_ns), K(current_replay_log_ts_ns));
K(last_replay_scn), K(current_replay_scn));
} else {
last_replay_log_ts_ns = current_replay_log_ts_ns;
last_replay_scn = current_replay_scn;
last_replay_ts = current_ts;
}
@ -997,13 +997,13 @@ int ObStartPrepareMigrationTask::generate_prepare_migration_dags_()
return ret;
}
int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
int ObStartPrepareMigrationTask::wait_ls_checkpoint_scn_push_()
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLS *ls = nullptr;
checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL;
int64_t checkpoint_ts = 0;
palf::SCN checkpoint_scn;
const int64_t MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH = GCONF._advance_checkpoint_timeout;
const int64_t MAX_SLEEP_INTERVAL_MS = 5 * 1000 * 1000; //5s
bool is_cancel = false;
@ -1026,8 +1026,6 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor));
} else {
const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time();
palf::SCN tmp;
tmp.convert_for_lsn_allocator(ctx_->log_sync_scn_);
while (OB_SUCC(ret)) {
if (ctx_->is_failed()) {
ret = OB_CANCELED;
@ -1038,12 +1036,12 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
} else if (is_cancel) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "task is cancelled", K(ret), K(*this));
} else if (FALSE_IT(checkpoint_ts = ls->get_clog_checkpoint_ts())) {
} else if (checkpoint_ts >= ctx_->log_sync_scn_) {
} else if (FALSE_IT(checkpoint_scn = ls->get_clog_checkpoint_scn())) {
} else if (checkpoint_scn >= ctx_->log_sync_scn_) {
const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts;
LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_);
break;
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(tmp))) {
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) {
if (OB_NO_NEED_UPDATE == ret) {
ret = OB_SUCCESS;
} else {
@ -1055,10 +1053,10 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
const int64_t current_ts = ObTimeUtility::current_time();
if (current_ts - wait_checkpoint_push_start_ts >= MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH) {
ret = OB_TIMEOUT;
LOG_WARN("wait ls checkpoint ts push time out",
"ls_checkpoint_ts", checkpoint_ts, "need_checkpoint_ts", ctx_->log_sync_scn_, "ls_id", ctx_->arg_.ls_id_);
LOG_WARN("wait ls checkpoint scn push time out",
"ls_checkpoint_scn", checkpoint_scn, "need_checkpoint_ts", ctx_->log_sync_scn_, "ls_id", ctx_->arg_.ls_id_);
} else {
LOG_INFO("wait ls checkpoint ts push", "ls_checkpoint_ts", checkpoint_ts,
LOG_INFO("wait ls checkpoint ts push", "ls_checkpoint_scn", checkpoint_scn,
"need_checkpoint_ts", ctx_->log_sync_scn_, "ls_id", ctx_->arg_.ls_id_);
ob_usleep(MAX_SLEEP_INTERVAL_MS);
}