BUGFIX: fix dead_lock between advance_checkpoint_by_flush and create_ls
This commit is contained in:
@ -72,13 +72,9 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t advance_checkpoint_timeout = GCONF._advance_checkpoint_timeout;
|
||||
LOG_INFO("backup advance checkpoint timeout", K(tenant_id), K(advance_checkpoint_timeout));
|
||||
checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL;
|
||||
if (start_scn < 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret), K(start_scn));
|
||||
} else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("checkpoint executor should not be null", K(ret), KPC(ls));
|
||||
} else {
|
||||
ObLSMetaPackage ls_meta_package;
|
||||
int64_t i = 0;
|
||||
@ -88,7 +84,7 @@ static int advance_checkpoint_by_flush(const uint64_t tenant_id, const share::Ob
|
||||
if (cur_ts - start_ts > advance_checkpoint_timeout) {
|
||||
ret = OB_BACKUP_ADVANCE_CHECKPOINT_TIMEOUT;
|
||||
LOG_WARN("backup advance checkpoint by flush timeout", K(ret), K(tenant_id), K(ls_id), K(start_scn));
|
||||
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(start_scn))) {
|
||||
} else if (OB_FAIL(ls->advance_checkpoint_by_flush(start_scn))) {
|
||||
if (OB_NO_NEED_UPDATE == ret) {
|
||||
// clog checkpoint ts has passed start log ts
|
||||
ret = OB_SUCCESS;
|
||||
|
||||
@ -200,48 +200,52 @@ int ObCheckpointExecutor::update_clog_checkpoint()
|
||||
|
||||
int ObCheckpointExecutor::advance_checkpoint_by_flush(int64_t recycle_ts) {
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
// calcu recycle_ts according to clog disk situation
|
||||
if (recycle_ts == 0) {
|
||||
LSN end_lsn;
|
||||
int64_t calcu_recycle_ts = INT64_MAX;
|
||||
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
|
||||
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
|
||||
} else {
|
||||
LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn();
|
||||
LSN calcu_recycle_lsn = clog_checkpoint_lsn
|
||||
+ ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100);
|
||||
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) {
|
||||
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn),
|
||||
K(recycle_ts), K(ls_->get_ls_id()));
|
||||
ObSpinLockGuard guard(lock_);
|
||||
if (update_checkpoint_enabled_) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
// calcu recycle_ts according to clog disk situation
|
||||
if (recycle_ts == 0) {
|
||||
LSN end_lsn;
|
||||
int64_t calcu_recycle_ts = INT64_MAX;
|
||||
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
|
||||
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full",
|
||||
K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn),
|
||||
K(calcu_recycle_lsn), K(ls_->get_ls_id()));
|
||||
LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn();
|
||||
LSN calcu_recycle_lsn = clog_checkpoint_lsn
|
||||
+ ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100);
|
||||
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_ts))) {
|
||||
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_ts), K(calcu_recycle_lsn),
|
||||
K(recycle_ts), K(ls_->get_ls_id()));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full",
|
||||
K(recycle_ts), K(end_lsn), K(clog_checkpoint_lsn),
|
||||
K(calcu_recycle_lsn), K(ls_->get_ls_id()));
|
||||
}
|
||||
}
|
||||
// the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block
|
||||
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
|
||||
recycle_ts = INT64_MAX;
|
||||
}
|
||||
}
|
||||
// the log of end_log_lsn and the log of clog_checkpoint_lsn may be in a block
|
||||
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
|
||||
recycle_ts = INT64_MAX;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
|
||||
ret = OB_NO_NEED_UPDATE;
|
||||
STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts",
|
||||
K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id()));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "start flush",
|
||||
K(recycle_ts),
|
||||
K(ls_->get_clog_checkpoint_ts()),
|
||||
K(ls_->get_ls_id()));
|
||||
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
|
||||
if (OB_NOT_NULL(handlers_[i])
|
||||
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) {
|
||||
STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret),
|
||||
K(i), K(ls_->get_ls_id()));
|
||||
if (OB_SUCC(ret)) {
|
||||
if (recycle_ts < ls_->get_clog_checkpoint_ts()) {
|
||||
ret = OB_NO_NEED_UPDATE;
|
||||
STORAGE_LOG(WARN, "recycle_ts should not smaller than checkpoint_log_ts",
|
||||
K(recycle_ts), K(ls_->get_clog_checkpoint_ts()), K(ls_->get_ls_id()));
|
||||
} else {
|
||||
STORAGE_LOG(INFO, "start flush",
|
||||
K(recycle_ts),
|
||||
K(ls_->get_clog_checkpoint_ts()),
|
||||
K(ls_->get_ls_id()));
|
||||
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
|
||||
if (OB_NOT_NULL(handlers_[i])
|
||||
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_ts)))) {
|
||||
STORAGE_LOG(WARN, "handler flush failed", K(recycle_ts), K(tmp_ret),
|
||||
K(i), K(ls_->get_ls_id()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1037,9 +1037,6 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
|
||||
LOG_WARN("failed to get ls saved info", K(ret), KPC(ls), KPC(ctx_));
|
||||
} else if (!saved_info.is_empty()) {
|
||||
LOG_INFO("saved info is not empty, no need wait ls checkpoint ts push", K(saved_info), KPC(ctx_));
|
||||
} else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor));
|
||||
} else {
|
||||
const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time();
|
||||
while (OB_SUCC(ret)) {
|
||||
@ -1059,7 +1056,7 @@ int ObStartPrepareMigrationTask::wait_ls_checkpoint_ts_push_()
|
||||
const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts;
|
||||
LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_);
|
||||
break;
|
||||
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) {
|
||||
} else if (OB_FAIL(ls->advance_checkpoint_by_flush(ctx_->log_sync_scn_))) {
|
||||
if (OB_NO_NEED_UPDATE == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
|
||||
@ -1153,6 +1153,14 @@ int ObLS::force_tablet_freeze(const ObTabletID &tablet_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLS::advance_checkpoint_by_flush(int64_t recycle_ts)
|
||||
{
|
||||
int64_t read_lock = LSLOCKALL;
|
||||
int64_t write_lock = 0;
|
||||
ObLSLockGuard lock_myself(lock_, read_lock, write_lock);
|
||||
return checkpoint_executor_.advance_checkpoint_by_flush(recycle_ts);
|
||||
}
|
||||
|
||||
int ObLS::get_ls_meta_package_and_tablet_ids(ObLSMetaPackage &meta_package, common::ObIArray<common::ObTabletID> &tablet_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -605,6 +605,7 @@ public:
|
||||
|
||||
// ObCheckpointExecutor interface:
|
||||
DELEGATE_WITH_RET(checkpoint_executor_, get_checkpoint_info, int);
|
||||
int advance_checkpoint_by_flush(int64_t recycle_ts);
|
||||
|
||||
// ObDataCheckpoint interface:
|
||||
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
|
||||
|
||||
@ -50,10 +50,9 @@ int ObAdvanceLSCkptTask::try_advance_ls_ckpt_ts()
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
}
|
||||
TRANS_LOG(WARN, "get ls faild", K(ret), K(MTL(ObLSService *)));
|
||||
} else if (ls_handle.get_ls()->get_checkpoint_executor()->advance_checkpoint_by_flush(
|
||||
target_ckpt_ts_)) {
|
||||
} else if (ls_handle.get_ls()->advance_checkpoint_by_flush(target_ckpt_ts_)) {
|
||||
TRANS_LOG(WARN, "advance checkpoint ts failed", K(ret), K(ls_id_), K(target_ckpt_ts_));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
TRANS_LOG(INFO, "[RetainCtxMgr] advance ls checkpoint ts success", K(ret), K(ls_id_), K(target_ckpt_ts_));
|
||||
|
||||
@ -355,10 +355,7 @@ int ObCheckPointService::do_minor_freeze()
|
||||
ObLS *ls = nullptr;
|
||||
int ls_cnt = 0;
|
||||
for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
|
||||
ObCheckpointExecutor *checkpoint_executor = nullptr;
|
||||
if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
|
||||
STORAGE_LOG(WARN, "checkpoint_executor should not be null", K(ls->get_ls_id()));
|
||||
} else if (OB_SUCCESS != (tmp_ret = (checkpoint_executor->advance_checkpoint_by_flush(INT64_MAX)))) {
|
||||
if (OB_SUCCESS != (tmp_ret = (ls->advance_checkpoint_by_flush(INT64_MAX)))) {
|
||||
STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", K(tmp_ret), K(ls->get_ls_id()));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user