BUGFIX: fix compatibility issues of ls.
This commit is contained in:
@ -772,6 +772,11 @@ void ObLS::destroy()
|
|||||||
}
|
}
|
||||||
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
|
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
|
||||||
FLOG_INFO("ObLS destroy", K(this), K(*this), K(lbt()));
|
FLOG_INFO("ObLS destroy", K(this), K(*this), K(lbt()));
|
||||||
|
if (running_state_.is_running()) {
|
||||||
|
if (OB_TMP_FAIL(offline_(start_ts))) {
|
||||||
|
LOG_WARN("offline a running ls failed", K(tmp_ret), K(ls_meta_.ls_id_));
|
||||||
|
}
|
||||||
|
}
|
||||||
if (OB_TMP_FAIL(stop_())) {
|
if (OB_TMP_FAIL(stop_())) {
|
||||||
LOG_WARN("ls stop failed.", K(tmp_ret), K(ls_meta_.ls_id_));
|
LOG_WARN("ls stop failed.", K(tmp_ret), K(ls_meta_.ls_id_));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -322,7 +322,7 @@ public:
|
|||||||
int create_ls(const share::ObTenantRole tenant_role,
|
int create_ls(const share::ObTenantRole tenant_role,
|
||||||
const palf::PalfBaseInfo &palf_base_info,
|
const palf::PalfBaseInfo &palf_base_info,
|
||||||
const common::ObReplicaType &replica_type,
|
const common::ObReplicaType &replica_type,
|
||||||
const bool allow_log_sync);
|
const bool allow_log_sync = false);
|
||||||
// load ls info from disk
|
// load ls info from disk
|
||||||
// @param[in] tenant_role, role of tenant, which determains palf access mode
|
// @param[in] tenant_role, role of tenant, which determains palf access mode
|
||||||
// @param[in] palf_base_info, all the info that palf needed
|
// @param[in] palf_base_info, all the info that palf needed
|
||||||
|
|||||||
@ -96,32 +96,14 @@ int ObLSMeta::set_start_work_state()
|
|||||||
|
|
||||||
int ObLSMeta::set_start_ha_state()
|
int ObLSMeta::set_start_ha_state()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
ObReentrantWLockGuard guard(lock_);
|
ObReentrantWLockGuard guard(lock_);
|
||||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
return ls_persistent_state_.start_ha(ls_id_);
|
||||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
|
||||||
} else if (tenant_data_version > DATA_VERSION_4_2_1_0) {
|
|
||||||
ret = ls_persistent_state_.start_ha(ls_id_);
|
|
||||||
} else {
|
|
||||||
ret = ls_persistent_state_.start_work(ls_id_);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLSMeta::set_finish_ha_state()
|
int ObLSMeta::set_finish_ha_state()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
ObReentrantWLockGuard guard(lock_);
|
ObReentrantWLockGuard guard(lock_);
|
||||||
if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
return ls_persistent_state_.finish_ha(ls_id_);
|
||||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
|
||||||
} else if (tenant_data_version > DATA_VERSION_4_2_1_0) {
|
|
||||||
ret = ls_persistent_state_.finish_ha(ls_id_);
|
|
||||||
} else {
|
|
||||||
ret = ls_persistent_state_.start_work(ls_id_);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLSMeta::set_remove_state()
|
int ObLSMeta::set_remove_state()
|
||||||
@ -298,7 +280,6 @@ int ObLSMeta::set_migration_status(const ObMigrationStatus &migration_status,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool can_change = false;
|
bool can_change = false;
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
ObReentrantWLockGuard guard(lock_);
|
ObReentrantWLockGuard guard(lock_);
|
||||||
if (OB_FAIL(check_can_update_())) {
|
if (OB_FAIL(check_can_update_())) {
|
||||||
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
||||||
@ -316,16 +297,12 @@ int ObLSMeta::set_migration_status(const ObMigrationStatus &migration_status,
|
|||||||
ret = OB_OP_NOT_ALLOW;
|
ret = OB_OP_NOT_ALLOW;
|
||||||
LOG_WARN("ls can not change to migrate status", K(ret), K(migration_status_),
|
LOG_WARN("ls can not change to migrate status", K(ret), K(migration_status_),
|
||||||
K(migration_status));
|
K(migration_status));
|
||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
|
||||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
|
||||||
} else {
|
} else {
|
||||||
ObLSMeta tmp(*this);
|
ObLSMeta tmp(*this);
|
||||||
tmp.migration_status_ = migration_status;
|
tmp.migration_status_ = migration_status;
|
||||||
tmp.ls_persistent_state_ = (OB_MIGRATION_STATUS_NONE == migration_status &&
|
tmp.ls_persistent_state_ = (OB_MIGRATION_STATUS_NONE == migration_status &&
|
||||||
ObLSRestoreStatus::RESTORE_NONE == restore_status_ ?
|
ObLSRestoreStatus::RESTORE_NONE == restore_status_ ?
|
||||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
|
||||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
|
||||||
|
|
||||||
if (write_slog && OB_FAIL(write_slog_(tmp))) {
|
if (write_slog && OB_FAIL(write_slog_(tmp))) {
|
||||||
LOG_WARN("migration_status write slog failed", K(ret));
|
LOG_WARN("migration_status write slog failed", K(ret));
|
||||||
@ -406,7 +383,6 @@ int ObLSMeta::get_offline_scn(SCN &offline_scn)
|
|||||||
int ObLSMeta::set_restore_status(const ObLSRestoreStatus &restore_status)
|
int ObLSMeta::set_restore_status(const ObLSRestoreStatus &restore_status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
ObReentrantWLockGuard guard(lock_);
|
ObReentrantWLockGuard guard(lock_);
|
||||||
if (OB_FAIL(check_can_update_())) {
|
if (OB_FAIL(check_can_update_())) {
|
||||||
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
LOG_WARN("ls meta cannot update", K(ret), K(*this));
|
||||||
@ -415,16 +391,12 @@ int ObLSMeta::set_restore_status(const ObLSRestoreStatus &restore_status)
|
|||||||
LOG_WARN("invalid restore status", K(ret), K(restore_status_), K(restore_status));
|
LOG_WARN("invalid restore status", K(ret), K(restore_status_), K(restore_status));
|
||||||
} else if (restore_status_ == restore_status) {
|
} else if (restore_status_ == restore_status) {
|
||||||
//do nothing
|
//do nothing
|
||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
|
||||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
|
||||||
} else {
|
} else {
|
||||||
ObLSMeta tmp(*this);
|
ObLSMeta tmp(*this);
|
||||||
tmp.restore_status_ = restore_status;
|
tmp.restore_status_ = restore_status;
|
||||||
tmp.ls_persistent_state_ = (ObLSRestoreStatus::RESTORE_NONE == restore_status &&
|
tmp.ls_persistent_state_ = (ObLSRestoreStatus::RESTORE_NONE == restore_status &&
|
||||||
OB_MIGRATION_STATUS_NONE == migration_status_ ?
|
OB_MIGRATION_STATUS_NONE == migration_status_ ?
|
||||||
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
ObLSPersistentState::State::LS_NORMAL : ls_persistent_state_);
|
||||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
|
||||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
|
||||||
if (OB_FAIL(write_slog_(tmp))) {
|
if (OB_FAIL(write_slog_(tmp))) {
|
||||||
LOG_WARN("restore_status write slog failed", K(ret));
|
LOG_WARN("restore_status write slog failed", K(ret));
|
||||||
} else if ((ObLSRestoreStatus::RESTORE_NONE == restore_status && OB_MIGRATION_STATUS_NONE == migration_status_)
|
} else if ((ObLSRestoreStatus::RESTORE_NONE == restore_status && OB_MIGRATION_STATUS_NONE == migration_status_)
|
||||||
@ -551,7 +523,6 @@ int ObLSMeta::set_ls_rebuild()
|
|||||||
const ObMigrationStatus change_status = ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD;
|
const ObMigrationStatus change_status = ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD;
|
||||||
const ObLSPersistentState persistent_state = ObLSPersistentState::State::LS_HA;
|
const ObLSPersistentState persistent_state = ObLSPersistentState::State::LS_HA;
|
||||||
bool can_change = false;
|
bool can_change = false;
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
|
|
||||||
ObReentrantWLockGuard guard(lock_);
|
ObReentrantWLockGuard guard(lock_);
|
||||||
if (OB_FAIL(check_can_update_())) {
|
if (OB_FAIL(check_can_update_())) {
|
||||||
@ -565,14 +536,10 @@ int ObLSMeta::set_ls_rebuild()
|
|||||||
} else if (!can_change) {
|
} else if (!can_change) {
|
||||||
ret = OB_OP_NOT_ALLOW;
|
ret = OB_OP_NOT_ALLOW;
|
||||||
LOG_WARN("ls can not change to rebuild status", K(ret), K(tmp), K(change_status));
|
LOG_WARN("ls can not change to rebuild status", K(ret), K(tmp), K(change_status));
|
||||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), tenant_data_version))) {
|
|
||||||
LOG_WARN("get min data version failed", K(ret), K(MTL_ID()));
|
|
||||||
} else {
|
} else {
|
||||||
tmp.migration_status_ = change_status;
|
tmp.migration_status_ = change_status;
|
||||||
tmp.rebuild_seq_++;
|
tmp.rebuild_seq_++;
|
||||||
tmp.ls_persistent_state_ = persistent_state;
|
tmp.ls_persistent_state_ = persistent_state;
|
||||||
tmp.ls_persistent_state_ = (tenant_data_version > DATA_VERSION_4_2_1_0 ?
|
|
||||||
tmp.ls_persistent_state_ : ObLSPersistentState::State::LS_NORMAL);
|
|
||||||
if (OB_FAIL(write_slog_(tmp))) {
|
if (OB_FAIL(write_slog_(tmp))) {
|
||||||
LOG_WARN("clog_checkpoint write slog failed", K(ret));
|
LOG_WARN("clog_checkpoint write slog failed", K(ret));
|
||||||
} else if (OB_FAIL(set_start_ha_state())) {
|
} else if (OB_FAIL(set_start_ha_state())) {
|
||||||
|
|||||||
@ -33,6 +33,10 @@ public:
|
|||||||
State::LS_OFFLINED == state_ ||
|
State::LS_OFFLINED == state_ ||
|
||||||
State::LS_STOPPED == state_);
|
State::LS_STOPPED == state_);
|
||||||
}
|
}
|
||||||
|
bool is_running() const
|
||||||
|
{
|
||||||
|
return State::LS_RUNNING == state_;
|
||||||
|
}
|
||||||
bool is_stopped() const
|
bool is_stopped() const
|
||||||
{
|
{
|
||||||
return (State::LS_INIT == state_ ||
|
return (State::LS_INIT == state_ ||
|
||||||
|
|||||||
@ -439,125 +439,36 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
int64_t abs_timeout_ts = INT64_MAX;
|
|
||||||
ObLS *ls = NULL;
|
|
||||||
ObStorageLogger *slogger = MTL(ObStorageLogger*);
|
|
||||||
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
|
|
||||||
int64_t create_type = ObLSCreateType::NORMAL;
|
|
||||||
bool is_commit = true;
|
|
||||||
bool need_retry = true;
|
|
||||||
bool ls_exist = false;
|
|
||||||
bool waiting_destroy = false;
|
|
||||||
const SCN create_scn = arg.get_create_scn();
|
|
||||||
palf::PalfBaseInfo palf_base_info;
|
|
||||||
const ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
|
||||||
LOG_INFO("create_ls begin", K(arg));
|
LOG_INFO("create_ls begin", K(arg));
|
||||||
DEBUG_SYNC(BEFORE_CREATE_USER_LS);
|
DEBUG_SYNC(BEFORE_CREATE_USER_LS);
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (OB_UNLIKELY(!arg.is_valid())) {
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("the ls service has not been inited", K(ret));
|
|
||||||
} else if (OB_UNLIKELY(!ObServerCheckpointSlogHandler::get_instance().is_started())) {
|
|
||||||
ret = OB_NOT_RUNNING;
|
|
||||||
LOG_WARN("ls service does not service before slog replay finished", K(ret));
|
|
||||||
} else if (OB_UNLIKELY(!arg.is_valid())) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), K(arg));
|
LOG_WARN("invalid argument", K(ret), K(arg));
|
||||||
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
|
|
||||||
abs_timeout_ts))) {
|
|
||||||
LOG_WARN("get timeout ts failed", KR(ret));
|
|
||||||
} else {
|
} else {
|
||||||
ObMutexGuardWithTimeout change_guard(change_lock_, abs_timeout_ts);
|
palf::PalfBaseInfo palf_base_info;
|
||||||
if (OB_UNLIKELY(!is_running_)) {
|
prepare_palf_base_info(arg, palf_base_info);
|
||||||
ret = OB_NOT_RUNNING;
|
|
||||||
LOG_WARN("ls service is not running.", K(ret));
|
ObMigrationOpArg mig_arg;
|
||||||
} else if (OB_FAIL(change_guard.get_ret())) {
|
ObCreateLSCommonArg common_arg;
|
||||||
LOG_WARN("lock failed, try again later", K(ret));
|
common_arg.ls_id_ = arg.get_ls_id();
|
||||||
ret = OB_EAGAIN;
|
common_arg.create_scn_ = arg.get_create_scn();
|
||||||
} else if (OB_FAIL(check_ls_exist(arg.get_ls_id(), ls_exist))) {
|
common_arg.palf_base_info_ = palf_base_info;
|
||||||
LOG_WARN("check ls exist failed", K(ret), K(arg));
|
common_arg.tenant_role_ = arg.get_tenant_info().get_tenant_role();
|
||||||
} else if (ls_exist) {
|
common_arg.replica_type_ = arg.get_replica_type();
|
||||||
LOG_WARN("ls exist, does not need create again, just return success", K(arg));
|
common_arg.compat_mode_ = arg.get_compat_mode();
|
||||||
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.get_ls_id(),
|
common_arg.create_type_ = is_ls_to_restore_(arg) ? ObLSCreateType::RESTORE : ObLSCreateType::NORMAL;
|
||||||
waiting_destroy))) {
|
common_arg.migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||||
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
|
common_arg.restore_status_ = (is_ls_to_restore_(arg) ?
|
||||||
} else if (waiting_destroy) {
|
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
|
||||||
ret = OB_LS_WAITING_SAFE_DESTROY;
|
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE));
|
||||||
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
|
common_arg.need_create_inner_tablet_ = need_create_inner_tablets_(arg);
|
||||||
} else if (OB_FAIL(check_tenant_ls_num_())) {
|
|
||||||
LOG_WARN("too many ls", K(ret));
|
if (OB_FAIL(create_ls_(common_arg, mig_arg))) {
|
||||||
} else if (OB_FAIL(inner_create_ls_(arg.get_ls_id(),
|
LOG_WARN("create ls failed", K(ret), K(arg));
|
||||||
migration_status,
|
|
||||||
(is_ls_to_restore_(arg) ?
|
|
||||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
|
|
||||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE)),
|
|
||||||
create_scn,
|
|
||||||
ls))) {
|
|
||||||
LOG_WARN("inner create log stream failed.", K(ret), K(arg), K(migration_status));
|
|
||||||
} else {
|
|
||||||
create_type = is_ls_to_restore_(arg) ? ObLSCreateType::RESTORE : ObLSCreateType::NORMAL;
|
|
||||||
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
|
|
||||||
// disable sync at default
|
|
||||||
const bool allow_log_sync = false;
|
|
||||||
prepare_palf_base_info(arg, palf_base_info);
|
|
||||||
ObLSLockGuard lock_ls(ls);
|
|
||||||
const ObLSMeta &ls_meta = ls->get_ls_meta();
|
|
||||||
if (OB_FAIL(add_ls_to_map_(ls))) {
|
|
||||||
LOG_WARN("add log stream to map failed.", K(ret));
|
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
|
|
||||||
// do nothing
|
|
||||||
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
|
|
||||||
LOG_WARN("fail to write create log stream slog", K(ls_meta));
|
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
|
|
||||||
} else if (OB_FAIL(ls->create_ls(arg.get_tenant_info().get_tenant_role(),
|
|
||||||
palf_base_info,
|
|
||||||
arg.get_replica_type(),
|
|
||||||
allow_log_sync))) {
|
|
||||||
LOG_WARN("enable ls palf failed", K(ret), K(arg), K(palf_base_info));
|
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
|
|
||||||
// only restore ls with base will not need create inner tablet
|
|
||||||
} else if (need_create_inner_tablets_(arg) &&
|
|
||||||
OB_FAIL(ls->create_ls_inner_tablet(arg.get_compat_mode(),
|
|
||||||
arg.get_create_scn()))) {
|
|
||||||
LOG_WARN("create ls inner tablet failed", K(ret), K(arg));
|
|
||||||
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(), create_type))) {
|
|
||||||
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
|
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
|
||||||
} else if (OB_FAIL(ls->finish_create_ls())) {
|
|
||||||
LOG_WARN("finish create ls failed", KR(ret));
|
|
||||||
} else if (OB_FAIL(post_create_ls_(create_type, ls))) {
|
|
||||||
LOG_WARN("post create ls failed", K(ret), K(arg));
|
|
||||||
}
|
|
||||||
if (OB_FAIL(ret)) {
|
|
||||||
do {
|
|
||||||
need_retry = false;
|
|
||||||
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
|
|
||||||
if (OB_TMP_FAIL(ls->set_remove_state())) {
|
|
||||||
need_retry = true;
|
|
||||||
LOG_ERROR("ls set remove state failed", KR(tmp_ret), K(ls_meta));
|
|
||||||
} else if (OB_TMP_FAIL(write_abort_create_ls_slog_(ls->get_ls_id()))) {
|
|
||||||
need_retry = true;
|
|
||||||
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (need_retry);
|
|
||||||
do {
|
|
||||||
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
|
|
||||||
need_retry = false;
|
|
||||||
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
|
|
||||||
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
|
|
||||||
need_retry = true;
|
|
||||||
LOG_WARN("ls disable palf failed", K(tmp_ret));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (need_retry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (OB_FAIL(ret)) {
|
|
||||||
del_ls_after_create_ls_failed_(state, ls);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FLOG_INFO("create_ls finish", K(ret), K(arg), K(abs_timeout_ts));
|
FLOG_INFO("create_ls finish", K(ret), K(arg));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1099,24 +1010,18 @@ void ObLSService::remove_ls_(ObLS *ls, const bool remove_from_disk)
|
|||||||
} while (OB_FAIL(ret));
|
} while (OB_FAIL(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLSService::create_ls_for_ha(
|
int ObLSService::create_ls_(const ObCreateLSCommonArg &arg,
|
||||||
const share::ObTaskId task_id,
|
const ObMigrationOpArg &mig_arg)
|
||||||
const ObMigrationOpArg &arg)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
int64_t abs_timeout_ts = INT64_MAX;
|
int64_t abs_timeout_ts = INT64_MAX;
|
||||||
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
|
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
|
||||||
ObLS *ls = NULL;
|
ObLS *ls = NULL;
|
||||||
ObLSMeta ls_meta;
|
|
||||||
ObStorageLogger *slogger = MTL(ObStorageLogger*);
|
ObStorageLogger *slogger = MTL(ObStorageLogger*);
|
||||||
bool is_commit = true;
|
|
||||||
bool need_retry = true;
|
bool need_retry = true;
|
||||||
ObMigrationStatus migration_status;
|
|
||||||
ObLSRestoreStatus restore_status = ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE);
|
|
||||||
bool ls_exist = false;
|
bool ls_exist = false;
|
||||||
bool waiting_destroy = false;
|
bool waiting_destroy = false;
|
||||||
int64_t create_type = ObLSCreateType::MIGRATE;
|
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -1124,12 +1029,6 @@ int ObLSService::create_ls_for_ha(
|
|||||||
} else if (OB_UNLIKELY(!ObServerCheckpointSlogHandler::get_instance().is_started())) {
|
} else if (OB_UNLIKELY(!ObServerCheckpointSlogHandler::get_instance().is_started())) {
|
||||||
ret = OB_NOT_RUNNING;
|
ret = OB_NOT_RUNNING;
|
||||||
LOG_WARN("ls service does not service before slog replay finished", K(ret));
|
LOG_WARN("ls service does not service before slog replay finished", K(ret));
|
||||||
} else if (task_id.is_invalid() || !arg.is_valid()) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("create ls for ha get invalid argument", K(ret), K(task_id), K(arg));
|
|
||||||
} else if (ObMigrationOpType::MIGRATE_LS_OP != arg.type_ && ObMigrationOpType::ADD_LS_OP != arg.type_) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("create ls for migration get unexpected op type", K(ret), K(task_id), K(arg));
|
|
||||||
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
|
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
|
||||||
abs_timeout_ts))) {
|
abs_timeout_ts))) {
|
||||||
LOG_WARN("get timeout ts failed", KR(ret));
|
LOG_WARN("get timeout ts failed", KR(ret));
|
||||||
@ -1142,35 +1041,28 @@ int ObLSService::create_ls_for_ha(
|
|||||||
LOG_WARN("lock failed, try again later", K(ret));
|
LOG_WARN("lock failed, try again later", K(ret));
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
} else if (OB_FAIL(check_ls_exist(arg.ls_id_, ls_exist))) {
|
} else if (OB_FAIL(check_ls_exist(arg.ls_id_, ls_exist))) {
|
||||||
LOG_WARN("check ls exist failed", K(ret), K(arg));
|
LOG_WARN("check ls exist failed", K(ret), K(arg.ls_id_));
|
||||||
} else if (ls_exist) {
|
} else if (ls_exist) {
|
||||||
ret = OB_ENTRY_EXIST;
|
ret = OB_ENTRY_EXIST;
|
||||||
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg));
|
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg.ls_id_));
|
||||||
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.ls_id_,
|
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.ls_id_,
|
||||||
waiting_destroy))) {
|
waiting_destroy))) {
|
||||||
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
|
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg.ls_id_));
|
||||||
} else if (waiting_destroy) {
|
} else if (waiting_destroy) {
|
||||||
ret = OB_LS_WAITING_SAFE_DESTROY;
|
ret = OB_LS_WAITING_SAFE_DESTROY;
|
||||||
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
|
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg.ls_id_));
|
||||||
} else if (OB_FAIL(check_tenant_ls_num_())) {
|
} else if (OB_FAIL(check_tenant_ls_num_())) {
|
||||||
LOG_WARN("too many ls", K(ret));
|
LOG_WARN("too many ls", K(ret));
|
||||||
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
|
|
||||||
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
|
|
||||||
} else if (OB_FAIL(get_restore_status_(restore_status))) {
|
|
||||||
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
|
|
||||||
} else if (OB_FAIL(inner_create_ls_(arg.ls_id_,
|
} else if (OB_FAIL(inner_create_ls_(arg.ls_id_,
|
||||||
migration_status,
|
arg.migration_status_,
|
||||||
restore_status,
|
arg.restore_status_,
|
||||||
ObScnRange::MIN_SCN, /* create scn */
|
arg.create_scn_,
|
||||||
ls))) {
|
ls))) {
|
||||||
LOG_WARN("create ls failed", K(ret), K(arg), K(task_id));
|
LOG_WARN("create ls failed", K(ret), K(arg.ls_id_));
|
||||||
} else {
|
} else {
|
||||||
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
|
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
|
||||||
palf::PalfBaseInfo palf_base_info;
|
|
||||||
palf_base_info.generate_by_default();
|
|
||||||
ObLSLockGuard lock_ls(ls);
|
ObLSLockGuard lock_ls(ls);
|
||||||
const bool allow_log_sync = false;
|
const ObLSMeta &ls_meta = ls->get_ls_meta();
|
||||||
ls_meta = ls->get_ls_meta();
|
|
||||||
if (OB_FAIL(add_ls_to_map_(ls))) {
|
if (OB_FAIL(add_ls_to_map_(ls))) {
|
||||||
LOG_WARN("add log stream to map failed.", K(ret));
|
LOG_WARN("add log stream to map failed.", K(ret));
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
|
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
|
||||||
@ -1178,21 +1070,27 @@ int ObLSService::create_ls_for_ha(
|
|||||||
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
|
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
|
||||||
LOG_WARN("fail to write create log stream slog", K(ls_meta));
|
LOG_WARN("fail to write create log stream slog", K(ls_meta));
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
|
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
|
||||||
} else if (OB_FAIL(ls->create_ls(share::RESTORE_TENANT_ROLE,
|
} else if (OB_FAIL(ls->create_ls(arg.tenant_role_,
|
||||||
palf_base_info,
|
arg.palf_base_info_,
|
||||||
arg.dst_.get_replica_type(),
|
arg.replica_type_))) {
|
||||||
allow_log_sync))) {
|
|
||||||
LOG_WARN("enable ls palf failed", K(ret), K(ls_meta));
|
LOG_WARN("enable ls palf failed", K(ret), K(ls_meta));
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
|
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
|
||||||
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(), create_type))) {
|
} else if (arg.need_create_inner_tablet_ &&
|
||||||
|
OB_FAIL(ls->create_ls_inner_tablet(arg.compat_mode_,
|
||||||
|
arg.create_scn_))) {
|
||||||
|
LOG_WARN("create ls inner tablet failed", K(ret), K(ls_meta));
|
||||||
|
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id(),
|
||||||
|
arg.create_type_))) {
|
||||||
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
|
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
|
||||||
|
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
||||||
} else if (OB_FAIL(ls->finish_create_ls())) {
|
} else if (OB_FAIL(ls->finish_create_ls())) {
|
||||||
LOG_WARN("finish create ls failed", KR(ret));
|
LOG_WARN("finish create ls failed", KR(ret));
|
||||||
} else if (OB_FAIL(post_create_ls_(create_type, ls))) {
|
} else if (OB_FAIL(post_create_ls_(arg.create_type_, ls))) {
|
||||||
LOG_WARN("post create ls failed", K(ret), K(arg));
|
LOG_WARN("post create ls failed", K(ret), K(ls_meta));
|
||||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_FINISH)) {
|
} else if (ObLSCreateType::MIGRATE == arg.create_type_ &&
|
||||||
} else if (OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(task_id, arg))) {
|
OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(arg.task_id_,
|
||||||
LOG_WARN("failed to add ls migration task", K(ret), K(arg));
|
mig_arg))) {
|
||||||
|
LOG_WARN("failed to add ls migration task", K(ret), K(mig_arg));
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
do {
|
do {
|
||||||
@ -1223,7 +1121,49 @@ int ObLSService::create_ls_for_ha(
|
|||||||
del_ls_after_create_ls_failed_(state, ls);
|
del_ls_after_create_ls_failed_(state, ls);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FLOG_INFO("create_ls for ha finish", K(ret), K(ls_meta), K(abs_timeout_ts));
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObLSService::create_ls_for_ha(
|
||||||
|
const share::ObTaskId task_id,
|
||||||
|
const ObMigrationOpArg &arg)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObMigrationStatus migration_status;
|
||||||
|
ObLSRestoreStatus restore_status = ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE);
|
||||||
|
|
||||||
|
if (task_id.is_invalid() || !arg.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("create ls for ha get invalid argument", K(ret), K(task_id), K(arg));
|
||||||
|
} else if (ObMigrationOpType::MIGRATE_LS_OP != arg.type_ && ObMigrationOpType::ADD_LS_OP != arg.type_) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("create ls for migration get unexpected op type", K(ret), K(task_id), K(arg));
|
||||||
|
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
|
||||||
|
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
|
||||||
|
} else if (OB_FAIL(get_restore_status_(restore_status))) {
|
||||||
|
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
|
||||||
|
} else {
|
||||||
|
palf::PalfBaseInfo palf_base_info;
|
||||||
|
palf_base_info.generate_by_default();
|
||||||
|
|
||||||
|
ObCreateLSCommonArg common_arg;
|
||||||
|
common_arg.ls_id_ = arg.ls_id_;
|
||||||
|
common_arg.create_scn_ = ObScnRange::MIN_SCN;
|
||||||
|
common_arg.palf_base_info_ = palf_base_info;
|
||||||
|
common_arg.tenant_role_ = share::RESTORE_TENANT_ROLE;
|
||||||
|
common_arg.replica_type_ = arg.dst_.get_replica_type();
|
||||||
|
common_arg.compat_mode_ = Worker::CompatMode::INVALID;
|
||||||
|
common_arg.create_type_ = ObLSCreateType::MIGRATE;
|
||||||
|
common_arg.migration_status_ = migration_status;
|
||||||
|
common_arg.restore_status_ = restore_status;
|
||||||
|
common_arg.task_id_ = task_id;
|
||||||
|
common_arg.need_create_inner_tablet_ = false;
|
||||||
|
|
||||||
|
if (OB_FAIL(create_ls_(common_arg, arg))) {
|
||||||
|
LOG_WARN("failed to create ls", K(ret), K(arg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FLOG_INFO("create_ls for ha finish", K(ret), K(task_id), K(arg));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -157,6 +157,22 @@ private:
|
|||||||
CREATE_STATE_PALF_ENABLED = 4, // enable_palf succ
|
CREATE_STATE_PALF_ENABLED = 4, // enable_palf succ
|
||||||
CREATE_STATE_FINISH
|
CREATE_STATE_FINISH
|
||||||
};
|
};
|
||||||
|
struct ObCreateLSCommonArg {
|
||||||
|
share::ObLSID ls_id_;
|
||||||
|
share::SCN create_scn_;
|
||||||
|
palf::PalfBaseInfo palf_base_info_;
|
||||||
|
ObTenantRole tenant_role_;
|
||||||
|
ObReplicaType replica_type_;
|
||||||
|
lib::Worker::CompatMode compat_mode_;
|
||||||
|
int64_t create_type_;
|
||||||
|
ObMigrationStatus migration_status_;
|
||||||
|
ObLSRestoreStatus restore_status_;
|
||||||
|
share::ObTaskId task_id_;
|
||||||
|
bool need_create_inner_tablet_;
|
||||||
|
};
|
||||||
|
|
||||||
|
int create_ls_(const ObCreateLSCommonArg &arg,
|
||||||
|
const ObMigrationOpArg &mig_arg);
|
||||||
// the tenant smaller than 5G can only create 8 ls.
|
// the tenant smaller than 5G can only create 8 ls.
|
||||||
// other tenant can create 100 ls.
|
// other tenant can create 100 ls.
|
||||||
int check_tenant_ls_num_();
|
int check_tenant_ls_num_();
|
||||||
|
|||||||
Reference in New Issue
Block a user