remove backup io logic in sync rpc

This commit is contained in:
oceanoverflow 2023-04-13 06:14:21 +00:00 committed by ob-robot
parent b1c3cda97d
commit 952e5eaa68
2 changed files with 20 additions and 9 deletions

View File

@ -498,8 +498,6 @@ ObLSBackupDataDagNet::~ObLSBackupDataDagNet()
int ObLSBackupDataDagNet::init_by_param(const ObIDagInitParam *param)
{
int ret = OB_SUCCESS;
ObLSBackupParam backup_param;
int64_t batch_size = 0;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("dag net init twice", K(ret));
@ -508,9 +506,24 @@ int ObLSBackupDataDagNet::init_by_param(const ObIDagInitParam *param)
LOG_WARN("get invalid args", K(ret), KP(param));
} else if (OB_FAIL(param_.assign(*(static_cast<const ObLSBackupDagNetInitParam *>(param))))) {
LOG_WARN("failed to assign param", K(ret));
} else if (OB_FAIL(param_.convert_to(backup_param))) {
} else if (OB_FAIL(this->set_dag_id(param_.job_desc_.trace_id_))) {
LOG_WARN("failed to set dag id", K(ret), K_(param));
} else {
is_inited_ = true;
index_kv_cache_ = &OB_BACKUP_INDEX_CACHE;
backup_data_type_ = param_.backup_data_type_;
report_ctx_ = param_.report_ctx_;
}
return ret;
}
int ObLSBackupDataDagNet::inner_init_before_run_()
{
int ret = OB_SUCCESS;
ObLSBackupParam backup_param;
int64_t batch_size = 0;
if (OB_FAIL(param_.convert_to(backup_param))) {
LOG_WARN("failed to convert param", K(param_));
} else if (FALSE_IT(backup_data_type_ = param_.backup_data_type_)) {
} else if (OB_FAIL(ls_backup_ctx_.open(backup_param, backup_data_type_, *param_.report_ctx_.sql_proxy_))) {
LOG_WARN("failed to open log stream backup ctx", K(ret), K(backup_param));
} else if (OB_FAIL(prepare_backup_tablet_provider_(backup_param, backup_data_type_, ls_backup_ctx_,
@ -520,13 +533,8 @@ int ObLSBackupDataDagNet::init_by_param(const ObIDagInitParam *param)
LOG_WARN("failed to get batch size", K(ret));
} else if (OB_FAIL(task_mgr_.init(backup_data_type_, batch_size))) {
LOG_WARN("failed to init task mgr", K(ret), K(batch_size));
} else if (OB_FAIL(this->set_dag_id(param_.job_desc_.trace_id_))) {
LOG_WARN("failed to set dag id", K(ret), K_(param));
} else {
is_inited_ = true;
ls_backup_ctx_.stat_mgr_.mark_begin(backup_data_type_);
index_kv_cache_ = &OB_BACKUP_INDEX_CACHE;
report_ctx_ = param_.report_ctx_;
}
return ret;
}
@ -546,6 +554,8 @@ int ObLSBackupDataDagNet::start_running()
} else if (OB_ISNULL(scheduler)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null MTL scheduler", K(ret), KP(scheduler));
} else if (OB_FAIL(inner_init_before_run_())) {
LOG_WARN("failed to inner init before run", K(ret));
} else if (OB_FAIL(param_.convert_to(init_param))) {
LOG_WARN("failed to convert to param", K(ret), K_(param));
} else if (FALSE_IT(init_param.backup_stage_ = start_stage_)) {

View File

@ -159,6 +159,7 @@ public:
INHERIT_TO_STRING_KV("ObIDagNet", share::ObIDagNet, K_(param));
private:
int inner_init_before_run_();
int get_batch_size_(int64_t &batch_size);
int prepare_backup_tablet_provider_(const ObLSBackupParam &param, const share::ObBackupDataType &backup_data_type,
ObLSBackupCtx &ls_backup_ctx, ObBackupIndexKVCache &index_kv_cache, common::ObMySQLProxy &sql_proxy,