BUGFIX: fix compatibility issues of ls.
This commit is contained in:
parent
fcfd20c623
commit
fb489e0727
@ -772,6 +772,11 @@ void ObLS::destroy()
|
||||
}
|
||||
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
|
||||
FLOG_INFO("ObLS destroy", K(this), K(*this), K(lbt()));
|
||||
if (running_state_.is_running()) {
|
||||
if (OB_TMP_FAIL(offline_(start_ts))) {
|
||||
LOG_WARN("offline a running ls failed", K(tmp_ret), K(ls_meta_.ls_id_));
|
||||
}
|
||||
}
|
||||
if (OB_TMP_FAIL(stop_())) {
|
||||
LOG_WARN("ls stop failed.", K(tmp_ret), K(ls_meta_.ls_id_));
|
||||
} else {
|
||||
|
@ -322,7 +322,7 @@ public:
|
||||
int create_ls(const share::ObTenantRole tenant_role,
|
||||
const palf::PalfBaseInfo &palf_base_info,
|
||||
const common::ObReplicaType &replica_type,
|
||||
const bool allow_log_sync);
|
||||
const bool allow_log_sync = false);
|
||||
// load ls info from disk
|
||||
// @param[in] tenant_role, role of tenant, which determains palf access mode
|
||||
// @param[in] palf_base_info, all the info that palf needed
|
||||
|
@ -96,32 +96,14 @@ int ObLSMeta::set_start_work_state()
|
||||
|
||||
int ObLSMeta::set_start_ha_state()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_data_version = 0;
|
||||
ObReentrantWLockGuard guard(lock_);
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
||||
} else if (tenant_data_version > DATA_VERSION_4_2_1_0) {
|
||||
ret = ls_persistent_state_.start_ha(ls_id_);
|
||||
} else {
|
||||
ret = ls_persistent_state_.start_work(ls_id_);
|
||||
}
|
||||
return ret;
|
||||
return ls_persistent_state_.start_ha(ls_id_);
|
||||
}
|
||||
|
||||
int ObLSMeta::set_finish_ha_state()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_data_version = 0;
|
||||
ObReentrantWLockGuard guard(lock_);
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
||||
} else if (tenant_data_version > DATA_VERSION_4_2_1_0) {
|
||||
ret = ls_persistent_state_.finish_ha(ls_id_);
|
||||
} else {
|
||||
ret = ls_persistent_state_.start_work(ls_id_);
|
||||
}
|
||||
return ret;
|
||||
return ls_persistent_state_.finish_ha(ls_id_);
|
||||
}
|
||||
|
||||
int ObLSMeta::set_remove_state()
|
||||
@ -298,7 +280,6 @@ int ObLSMeta::set_migration_status(const ObMigrationStatus &migration_status,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool can_change = false;
|
||||
uint64_t tenant_data_version = 0;
|
||||
ObReentrantWLockGuard guard(lock_);
|
||||
if (OB_FAIL(check_can_update_())) {
|
||||
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
||||
@ -316,16 +297,12 @@ int ObLSMeta::set_migration_status(const ObMigrationStatus &migration_status,
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("ls can not change to migrate status", K(ret), K(migration_status_),
|
||||
K(migration_status));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
||||
} else {
|
||||
ObLSMeta tmp(*this);
|
||||
tmp.migration_status_ = migration_status;
|
||||
tmp.ls_persistent_state_ = (OB_MIGRATION_STATUS_NONE == migration_status &&
|
||||
ObLSRestoreStatus::RESTORE_NONE == restore_status_ ?
|
||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
||||
ObLSRestoreStatus::RESTORE_NONE == restore_status_ ?
|
||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||
|
||||
if (write_slog && OB_FAIL(write_slog_(tmp))) {
|
||||
LOG_WARN("migration_status write slog failed", K(ret));
|
||||
@ -406,7 +383,6 @@ int ObLSMeta::get_offline_scn(SCN &offline_scn)
|
||||
int ObLSMeta::set_restore_status(const ObLSRestoreStatus &restore_status)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_data_version = 0;
|
||||
ObReentrantWLockGuard guard(lock_);
|
||||
if (OB_FAIL(check_can_update_())) {
|
||||
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
||||
@ -415,16 +391,12 @@ int ObLSMeta::set_restore_status(const ObLSRestoreStatus &restore_status)
|
||||
LOG_WARN("invalid restore status", K(ret), K(restore_status_), K(restore_status));
|
||||
} else if (restore_status_ == restore_status) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
||||
} else {
|
||||
ObLSMeta tmp(*this);
|
||||
tmp.restore_status_ = restore_status;
|
||||
tmp.ls_persistent_state_ = (ObLSRestoreStatus::RESTORE_NONE == restore_status &&
|
||||
OB_MIGRATION_STATUS_NONE == migration_status_ ?
|
||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
||||
OB_MIGRATION_STATUS_NONE == migration_status_ ?
|
||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||
if (OB_FAIL(write_slog_(tmp))) {
|
||||
LOG_WARN("restore_status write slog failed", K(ret));
|
||||
} else if ((ObLSRestoreStatus::RESTORE_NONE == restore_status && OB_MIGRATION_STATUS_NONE == migration_status_)
|
||||
@ -551,7 +523,6 @@ int ObLSMeta::set_ls_rebuild()
|
||||
const ObMigrationStatus change_status = ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD;
|
||||
const ObLSPersistentState persistent_state = ObLSPersistentState::State::LS_HA;
|
||||
bool can_change = false;
|
||||
uint64_t tenant_data_version = 0;
|
||||
|
||||
ObReentrantWLockGuard guard(lock_);
|
||||
if (OB_FAIL(check_can_update_())) {
|
||||
@ -565,14 +536,10 @@ int ObLSMeta::set_ls_rebuild()
|
||||
} else if (!can_change) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("ls can not change to rebuild status", K(ret), K(tmp), K(change_status));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
||||
} else {
|
||||
tmp.migration_status_ = change_status;
|
||||
tmp.rebuild_seq_++;
|
||||
tmp.ls_persistent_state_ = persistent_state;
|
||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
||||
if (OB_FAIL(write_slog_(tmp))) {
|
||||
LOG_WARN("clog_checkpoint write slog failed", K(ret));
|
||||
} else if (OB_FAIL(set_start_ha_state())) {
|
||||
|
@ -33,6 +33,10 @@ public:
|
||||
State::LS_OFFLINED == state_ ||
|
||||
State::LS_STOPPED == state_);
|
||||
}
|
||||
bool is_running() const
|
||||
{
|
||||
return State::LS_RUNNING == state_;
|
||||
}
|
||||
bool is_stopped() const
|
||||
{
|
||||
return (State::LS_INIT == state_ ||
|
||||
|
@ -439,125 +439,36 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t abs_timeout_ts = INT64_MAX;
|
||||
ObLS *ls = NULL;
|
||||
ObStorageLogger *slogger = MTL(ObStorageLogger*);
|
||||
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
|
||||
int64_t create_type = ObLSCreateType::NORMAL;
|
||||
bool is_commit = true;
|
||||
bool need_retry = true;
|
||||
bool ls_exist = false;
|
||||
bool waiting_destroy = false;
|
||||
const SCN create_scn = arg.get_create_scn();
|
||||
palf::PalfBaseInfo palf_base_info;
|
||||
const ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||
LOG_INFO("create_ls begin", K(arg));
|
||||
DEBUG_SYNC(BEFORE_CREATE_USER_LS);
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("the ls service has not been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!ObServerCheckpointSlogHandler::get_instance().is_started())) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
LOG_WARN("ls service does not service before slog replay finished", K(ret));
|
||||
} else if (OB_UNLIKELY(!arg.is_valid())) {
|
||||
if (OB_UNLIKELY(!arg.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(arg));
|
||||
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
|
||||
abs_timeout_ts))) {
|
||||
LOG_WARN("get timeout ts failed", KR(ret));
|
||||
} else {
|
||||
ObMutexGuardWithTimeout change_guard(change_lock_, abs_timeout_ts);
|
||||
if (OB_UNLIKELY(!is_running_)) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
LOG_WARN("ls service is not running.", K(ret));
|
||||
} else if (OB_FAIL(change_guard.get_ret())) {
|
||||
LOG_WARN("lock failed, try again later", K(ret));
|
||||
ret = OB_EAGAIN;
|
||||
} else if (OB_FAIL(check_ls_exist(arg.get_ls_id(), ls_exist))) {
|
||||
LOG_WARN("check ls exist failed", K(ret), K(arg));
|
||||
} else if (ls_exist) {
|
||||
LOG_WARN("ls exist, does not need create again, just return success", K(arg));
|
||||
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.get_ls_id(),
|
||||
waiting_destroy))) {
|
||||
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
|
||||
} else if (waiting_destroy) {
|
||||
ret = OB_LS_WAITING_SAFE_DESTROY;
|
||||
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
|
||||
} else if (OB_FAIL(check_tenant_ls_num_())) {
|
||||
LOG_WARN("too many ls", K(ret));
|
||||
} else if (OB_FAIL(inner_create_ls_(arg.get_ls_id(),
|
||||
migration_status,
|
||||
(is_ls_to_restore_(arg) ?
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE)),
|
||||
create_scn,
|
||||
ls))) {
|
||||
LOG_WARN("inner create log stream failed.", K(ret), K(arg), K(migration_status));
|
||||
} else {
|
||||
create_type = is_ls_to_restore_(arg) ? ObLSCreateType::RESTORE : ObLSCreateType::NORMAL;
|
||||
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
|
||||
// disable sync at default
|
||||
const bool allow_log_sync = false;
|
||||
prepare_palf_base_info(arg, palf_base_info);
|
||||
ObLSLockGuard lock_ls(ls);
|
||||
const ObLSMeta &ls_meta = ls->get_ls_meta();
|
||||
if (OB_FAIL(add_ls_to_map_(ls))) {
|
||||
LOG_WARN("add log stream to map failed.", K(ret));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
|
||||
LOG_WARN("fail to write create log stream slog", K(ls_meta));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
|
||||
} else if (OB_FAIL(ls->create_ls(arg.get_tenant_info().get_tenant_role(),
|
||||
palf_base_info,
|
||||
arg.get_replica_type(),
|
||||
allow_log_sync))) {
|
||||
LOG_WARN("enable ls palf failed", K(ret), K(arg), K(palf_base_info));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
|
||||
// only restore ls with base will not need create inner tablet
|
||||
} else if (need_create_inner_tablets_(arg) &&
|
||||
OB_FAIL(ls->create_ls_inner_tablet(arg.get_compat_mode(),
|
||||
arg.get_create_scn()))) {
|
||||
LOG_WARN("create ls inner tablet failed", K(ret), K(arg));
|
||||
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(), create_type))) {
|
||||
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
||||
} else if (OB_FAIL(ls->finish_create_ls())) {
|
||||
LOG_WARN("finish create ls failed", KR(ret));
|
||||
} else if (OB_FAIL(post_create_ls_(create_type, ls))) {
|
||||
LOG_WARN("post create ls failed", K(ret), K(arg));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
do {
|
||||
need_retry = false;
|
||||
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
|
||||
if (OB_TMP_FAIL(ls->set_remove_state())) {
|
||||
need_retry = true;
|
||||
LOG_ERROR("ls set remove state failed", KR(tmp_ret), K(ls_meta));
|
||||
} else if (OB_TMP_FAIL(write_abort_create_ls_slog_(ls->get_ls_id()))) {
|
||||
need_retry = true;
|
||||
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
|
||||
}
|
||||
}
|
||||
} while (need_retry);
|
||||
do {
|
||||
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
|
||||
need_retry = false;
|
||||
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
|
||||
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
|
||||
need_retry = true;
|
||||
LOG_WARN("ls disable palf failed", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
} while (need_retry);
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
del_ls_after_create_ls_failed_(state, ls);
|
||||
palf::PalfBaseInfo palf_base_info;
|
||||
prepare_palf_base_info(arg, palf_base_info);
|
||||
|
||||
ObMigrationOpArg mig_arg;
|
||||
ObCreateLSCommonArg common_arg;
|
||||
common_arg.ls_id_ = arg.get_ls_id();
|
||||
common_arg.create_scn_ = arg.get_create_scn();
|
||||
common_arg.palf_base_info_ = palf_base_info;
|
||||
common_arg.tenant_role_ = arg.get_tenant_info().get_tenant_role();
|
||||
common_arg.replica_type_ = arg.get_replica_type();
|
||||
common_arg.compat_mode_ = arg.get_compat_mode();
|
||||
common_arg.create_type_ = is_ls_to_restore_(arg) ? ObLSCreateType::RESTORE : ObLSCreateType::NORMAL;
|
||||
common_arg.migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||
common_arg.restore_status_ = (is_ls_to_restore_(arg) ?
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE));
|
||||
common_arg.need_create_inner_tablet_ = need_create_inner_tablets_(arg);
|
||||
|
||||
if (OB_FAIL(create_ls_(common_arg, mig_arg))) {
|
||||
LOG_WARN("create ls failed", K(ret), K(arg));
|
||||
}
|
||||
}
|
||||
FLOG_INFO("create_ls finish", K(ret), K(arg), K(abs_timeout_ts));
|
||||
FLOG_INFO("create_ls finish", K(ret), K(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1099,24 +1010,18 @@ void ObLSService::remove_ls_(ObLS *ls, const bool remove_from_disk)
|
||||
} while (OB_FAIL(ret));
|
||||
}
|
||||
|
||||
int ObLSService::create_ls_for_ha(
|
||||
const share::ObTaskId task_id,
|
||||
const ObMigrationOpArg &arg)
|
||||
int ObLSService::create_ls_(const ObCreateLSCommonArg &arg,
|
||||
const ObMigrationOpArg &mig_arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t abs_timeout_ts = INT64_MAX;
|
||||
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
|
||||
ObLS *ls = NULL;
|
||||
ObLSMeta ls_meta;
|
||||
ObStorageLogger *slogger = MTL(ObStorageLogger*);
|
||||
bool is_commit = true;
|
||||
bool need_retry = true;
|
||||
ObMigrationStatus migration_status;
|
||||
ObLSRestoreStatus restore_status = ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE);
|
||||
bool ls_exist = false;
|
||||
bool waiting_destroy = false;
|
||||
int64_t create_type = ObLSCreateType::MIGRATE;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -1124,12 +1029,6 @@ int ObLSService::create_ls_for_ha(
|
||||
} else if (OB_UNLIKELY(!ObServerCheckpointSlogHandler::get_instance().is_started())) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
LOG_WARN("ls service does not service before slog replay finished", K(ret));
|
||||
} else if (task_id.is_invalid() || !arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("create ls for ha get invalid argument", K(ret), K(task_id), K(arg));
|
||||
} else if (ObMigrationOpType::MIGRATE_LS_OP != arg.type_ && ObMigrationOpType::ADD_LS_OP != arg.type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("create ls for migration get unexpected op type", K(ret), K(task_id), K(arg));
|
||||
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
|
||||
abs_timeout_ts))) {
|
||||
LOG_WARN("get timeout ts failed", KR(ret));
|
||||
@ -1142,35 +1041,28 @@ int ObLSService::create_ls_for_ha(
|
||||
LOG_WARN("lock failed, try again later", K(ret));
|
||||
ret = OB_EAGAIN;
|
||||
} else if (OB_FAIL(check_ls_exist(arg.ls_id_, ls_exist))) {
|
||||
LOG_WARN("check ls exist failed", K(ret), K(arg));
|
||||
LOG_WARN("check ls exist failed", K(ret), K(arg.ls_id_));
|
||||
} else if (ls_exist) {
|
||||
ret = OB_ENTRY_EXIST;
|
||||
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg));
|
||||
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg.ls_id_));
|
||||
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.ls_id_,
|
||||
waiting_destroy))) {
|
||||
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
|
||||
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg.ls_id_));
|
||||
} else if (waiting_destroy) {
|
||||
ret = OB_LS_WAITING_SAFE_DESTROY;
|
||||
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
|
||||
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg.ls_id_));
|
||||
} else if (OB_FAIL(check_tenant_ls_num_())) {
|
||||
LOG_WARN("too many ls", K(ret));
|
||||
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
|
||||
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
|
||||
} else if (OB_FAIL(get_restore_status_(restore_status))) {
|
||||
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
|
||||
} else if (OB_FAIL(inner_create_ls_(arg.ls_id_,
|
||||
migration_status,
|
||||
restore_status,
|
||||
ObScnRange::MIN_SCN, /* create scn */
|
||||
arg.migration_status_,
|
||||
arg.restore_status_,
|
||||
arg.create_scn_,
|
||||
ls))) {
|
||||
LOG_WARN("create ls failed", K(ret), K(arg), K(task_id));
|
||||
LOG_WARN("create ls failed", K(ret), K(arg.ls_id_));
|
||||
} else {
|
||||
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
|
||||
palf::PalfBaseInfo palf_base_info;
|
||||
palf_base_info.generate_by_default();
|
||||
ObLSLockGuard lock_ls(ls);
|
||||
const bool allow_log_sync = false;
|
||||
ls_meta = ls->get_ls_meta();
|
||||
const ObLSMeta &ls_meta = ls->get_ls_meta();
|
||||
if (OB_FAIL(add_ls_to_map_(ls))) {
|
||||
LOG_WARN("add log stream to map failed.", K(ret));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
|
||||
@ -1178,21 +1070,27 @@ int ObLSService::create_ls_for_ha(
|
||||
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
|
||||
LOG_WARN("fail to write create log stream slog", K(ls_meta));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
|
||||
} else if (OB_FAIL(ls->create_ls(share::RESTORE_TENANT_ROLE,
|
||||
palf_base_info,
|
||||
arg.dst_.get_replica_type(),
|
||||
allow_log_sync))) {
|
||||
} else if (OB_FAIL(ls->create_ls(arg.tenant_role_,
|
||||
arg.palf_base_info_,
|
||||
arg.replica_type_))) {
|
||||
LOG_WARN("enable ls palf failed", K(ret), K(ls_meta));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
|
||||
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(), create_type))) {
|
||||
} else if (arg.need_create_inner_tablet_ &&
|
||||
OB_FAIL(ls->create_ls_inner_tablet(arg.compat_mode_,
|
||||
arg.create_scn_))) {
|
||||
LOG_WARN("create ls inner tablet failed", K(ret), K(ls_meta));
|
||||
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(),
|
||||
arg.create_type_))) {
|
||||
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
||||
} else if (OB_FAIL(ls->finish_create_ls())) {
|
||||
LOG_WARN("finish create ls failed", KR(ret));
|
||||
} else if (OB_FAIL(post_create_ls_(create_type, ls))) {
|
||||
LOG_WARN("post create ls failed", K(ret), K(arg));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
||||
} else if (OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(task_id, arg))) {
|
||||
LOG_WARN("failed to add ls migration task", K(ret), K(arg));
|
||||
} else if (OB_FAIL(post_create_ls_(arg.create_type_, ls))) {
|
||||
LOG_WARN("post create ls failed", K(ret), K(ls_meta));
|
||||
} else if (ObLSCreateType::MIGRATE == arg.create_type_ &&
|
||||
OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(arg.task_id_,
|
||||
mig_arg))) {
|
||||
LOG_WARN("failed to add ls migration task", K(ret), K(mig_arg));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
do {
|
||||
@ -1223,7 +1121,49 @@ int ObLSService::create_ls_for_ha(
|
||||
del_ls_after_create_ls_failed_(state, ls);
|
||||
}
|
||||
}
|
||||
FLOG_INFO("create_ls for ha finish", K(ret), K(ls_meta), K(abs_timeout_ts));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSService::create_ls_for_ha(
|
||||
const share::ObTaskId task_id,
|
||||
const ObMigrationOpArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMigrationStatus migration_status;
|
||||
ObLSRestoreStatus restore_status = ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE);
|
||||
|
||||
if (task_id.is_invalid() || !arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("create ls for ha get invalid argument", K(ret), K(task_id), K(arg));
|
||||
} else if (ObMigrationOpType::MIGRATE_LS_OP != arg.type_ && ObMigrationOpType::ADD_LS_OP != arg.type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("create ls for migration get unexpected op type", K(ret), K(task_id), K(arg));
|
||||
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
|
||||
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
|
||||
} else if (OB_FAIL(get_restore_status_(restore_status))) {
|
||||
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
|
||||
} else {
|
||||
palf::PalfBaseInfo palf_base_info;
|
||||
palf_base_info.generate_by_default();
|
||||
|
||||
ObCreateLSCommonArg common_arg;
|
||||
common_arg.ls_id_ = arg.ls_id_;
|
||||
common_arg.create_scn_ = ObScnRange::MIN_SCN;
|
||||
common_arg.palf_base_info_ = palf_base_info;
|
||||
common_arg.tenant_role_ = share::RESTORE_TENANT_ROLE;
|
||||
common_arg.replica_type_ = arg.dst_.get_replica_type();
|
||||
common_arg.compat_mode_ = Worker::CompatMode::INVALID;
|
||||
common_arg.create_type_ = ObLSCreateType::MIGRATE;
|
||||
common_arg.migration_status_ = migration_status;
|
||||
common_arg.restore_status_ = restore_status;
|
||||
common_arg.task_id_ = task_id;
|
||||
common_arg.need_create_inner_tablet_ = false;
|
||||
|
||||
if (OB_FAIL(create_ls_(common_arg, arg))) {
|
||||
LOG_WARN("failed to create ls", K(ret), K(arg));
|
||||
}
|
||||
}
|
||||
FLOG_INFO("create_ls for ha finish", K(ret), K(task_id), K(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -157,6 +157,22 @@ private:
|
||||
CREATE_STATE_PALF_ENABLED = 4, // enable_palf succ
|
||||
CREATE_STATE_FINISH
|
||||
};
|
||||
struct ObCreateLSCommonArg {
|
||||
share::ObLSID ls_id_;
|
||||
share::SCN create_scn_;
|
||||
palf::PalfBaseInfo palf_base_info_;
|
||||
ObTenantRole tenant_role_;
|
||||
ObReplicaType replica_type_;
|
||||
lib::Worker::CompatMode compat_mode_;
|
||||
int64_t create_type_;
|
||||
ObMigrationStatus migration_status_;
|
||||
ObLSRestoreStatus restore_status_;
|
||||
share::ObTaskId task_id_;
|
||||
bool need_create_inner_tablet_;
|
||||
};
|
||||
|
||||
int create_ls_(const ObCreateLSCommonArg &arg,
|
||||
const ObMigrationOpArg &mig_arg);
|
||||
// the tenant smaller than 5G can only create 8 ls.
|
||||
// other tenant can create 100 ls.
|
||||
int check_tenant_ls_num_();
|
||||
|
Loading…
x
Reference in New Issue
Block a user