BUGFIX: add lock timeout for ls service

This commit is contained in:
obdev 2023-07-04 02:18:32 +00:00 committed by ob-robot
parent 95be350cd2
commit fa8ee1586e
5 changed files with 295 additions and 226 deletions

View File

@ -62,6 +62,47 @@ inline ObLockGuard<LockT>::~ObLockGuard()
}
}
template <typename LockT>
class ObLockGuardWithTimeout
{
public:
[[nodiscard]] explicit ObLockGuardWithTimeout(LockT &lock, const int64_t abs_timeout_us = INT64_MAX);
~ObLockGuardWithTimeout();
inline int get_ret() const { return ret_; }
private:
// disallow copy
ObLockGuardWithTimeout(const ObLockGuardWithTimeout &other);
ObLockGuardWithTimeout &operator=(const ObLockGuardWithTimeout &other);
// disallow new
void *operator new(std::size_t size);
void *operator new(std::size_t size, const std::nothrow_t &nothrow_constant) throw();
void *operator new(std::size_t size, void *ptr) throw();
private:
// data members
LockT &lock_;
int ret_;
};
template <typename LockT>
inline ObLockGuardWithTimeout<LockT>::ObLockGuardWithTimeout(LockT &lock, const int64_t abs_timeout_us)
: lock_(lock),
ret_(common::OB_SUCCESS)
{
if (OB_UNLIKELY(common::OB_SUCCESS != (ret_ = lock_.lock(abs_timeout_us)))) {
COMMON_LOG_RET(ERROR, ret_, "Fail to lock, ", K_(ret));
}
}
template <typename LockT>
inline ObLockGuardWithTimeout<LockT>::~ObLockGuardWithTimeout()
{
if (OB_LIKELY(common::OB_SUCCESS == ret_)) {
if (OB_UNLIKELY(common::OB_SUCCESS != (ret_ = lock_.unlock()))) {
COMMON_LOG_RET(ERROR, ret_, "Fail to unlock, ", K_(ret));
}
}
}
} // end of namespace lib
} // end of namespace oceanbase

View File

@ -29,7 +29,7 @@ public:
{
}
~ObMutex() { }
inline int lock() { return latch_.lock(latch_id_); }
inline int lock(const int64_t abs_timeout_us = INT64_MAX) { return latch_.lock(latch_id_, abs_timeout_us); }
inline int trylock() { return latch_.try_lock(latch_id_); }
inline int unlock() { return latch_.unlock(); }
void enable_record_stat(bool enable) { latch_.enable_record_stat(enable); }
@ -42,6 +42,7 @@ private:
};
typedef ObLockGuard<ObMutex> ObMutexGuard;
typedef ObLockGuardWithTimeout<ObMutex> ObMutexGuardWithTimeout;
} // end of namespace lib
} // end of namespace oceanbase

View File

@ -389,7 +389,7 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs,
K(create_scn));
} else {
ObTimeoutCtx ctx;
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.internal_sql_execute_timeout))) {
LOG_WARN("fail to set timeout ctx", KR(ret));
} else {
obrpc::ObCreateLSArg arg;

View File

@ -36,6 +36,8 @@
namespace oceanbase
{
using namespace share;
using namespace palf;
using namespace lib;
namespace storage
{
@ -403,6 +405,7 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t abs_timeout_ts = INT64_MAX;
ObLS *ls = NULL;
ObStorageLogger *slogger = MTL(ObStorageLogger*);
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
@ -413,7 +416,6 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
const SCN create_scn = arg.get_create_scn();
palf::PalfBaseInfo palf_base_info;
const ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
lib::ObMutexGuard change_guard(change_lock_);
LOG_INFO("create_ls begin", K(arg));
DEBUG_SYNC(BEFORE_CREATE_USER_LS);
@ -429,105 +431,113 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(arg));
} else if (OB_FAIL(check_ls_exist(arg.get_ls_id(),
ls_exist))) {
LOG_WARN("check ls exist failed", K(ret), K(arg));
} else if (ls_exist) {
LOG_WARN("ls exist, does not need create again, just return success", K(arg));
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.get_ls_id(),
waiting_destroy))) {
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
} else if (waiting_destroy) {
ret = OB_EAGAIN;
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
} else if (OB_FAIL(inner_create_ls_(arg.get_ls_id(),
migration_status,
(is_ls_to_restore_(arg) ?
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE)),
create_scn,
ls))) {
LOG_WARN("inner create log stream failed.", K(ret), K(arg), K(migration_status));
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
abs_timeout_ts))) {
LOG_WARN("get timeout ts failed", KR(ret));
} else {
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
// TODO by yunlong: to remove allow_log_sync flag
const bool unused_allow_log_sync = true;
prepare_palf_base_info(arg, palf_base_info);
ObLSLockGuard lock_ls(ls);
const ObLSMeta &ls_meta = ls->get_ls_meta();
if (OB_FAIL(add_ls_to_map_(ls))) {
LOG_WARN("add log stream to map failed.", K(ret));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
// do nothing
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
LOG_WARN("fail to write create log stream slog", K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
} else if (OB_FAIL(ls->create_ls(arg.get_tenant_info().get_tenant_role(),
palf_base_info,
arg.get_replica_type(),
unused_allow_log_sync))) {
LOG_WARN("enable ls palf failed", K(ret), K(arg), K(palf_base_info));
// only restore ls does not need enable replay
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
// inner tablet reverted by inner_del_ls_ if fail to create
// only restore ls with base will not need create inner tablet
} else if (need_create_inner_tablets_(arg) &&
OB_FAIL(ls->create_ls_inner_tablet(arg.get_compat_mode(),
arg.get_create_scn()))) {
LOG_WARN("create ls inner tablet failed", K(ret), K(arg));
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id()))) {
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
ObMutexGuardWithTimeout change_guard(change_lock_, abs_timeout_ts);
if (OB_FAIL(change_guard.get_ret())) {
LOG_WARN("lock failed, try again later", K(ret));
ret = OB_EAGAIN;
} else if (OB_FAIL(check_ls_exist(arg.get_ls_id(), ls_exist))) {
LOG_WARN("check ls exist failed", K(ret), K(arg));
} else if (ls_exist) {
LOG_WARN("ls exist, does not need create again, just return success", K(arg));
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.get_ls_id(),
waiting_destroy))) {
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
} else if (waiting_destroy) {
ret = OB_EAGAIN;
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
} else if (OB_FAIL(inner_create_ls_(arg.get_ls_id(),
migration_status,
(is_ls_to_restore_(arg) ?
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE)),
create_scn,
ls))) {
LOG_WARN("inner create log stream failed.", K(ret), K(arg), K(migration_status));
} else {
state = ObLSCreateState::CREATE_STATE_FINISH;
ls->finish_create(is_commit);
if (OB_FAIL(ls->start())) {
LOG_ERROR("ls start failed", K(ret), K(arg));
} else if (!is_ls_to_restore_(arg) &&
OB_FAIL(ls->enable_replay_without_lock())) {
LOG_WARN("enable ls replay failed", K(ret), K(arg));
} else if (is_ls_to_restore_(arg)) {
if (OB_FAIL(ls->offline_without_lock())) {
LOG_WARN("failed to offline", K(ret), K(arg));
} else if (OB_FAIL(ls->get_log_handler()->enable_sync())) {
LOG_WARN("failed to enable sync", K(ret), K(arg));
} else if (OB_FAIL(ls->get_ls_restore_handler()->online())) {
LOG_WARN("failed to online restore handler", K(ret), K(arg));
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
// TODO by yunlong: to remove allow_log_sync flag
const bool unused_allow_log_sync = true;
prepare_palf_base_info(arg, palf_base_info);
ObLSLockGuard lock_ls(ls);
const ObLSMeta &ls_meta = ls->get_ls_meta();
if (OB_FAIL(add_ls_to_map_(ls))) {
LOG_WARN("add log stream to map failed.", K(ret));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
// do nothing
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
LOG_WARN("fail to write create log stream slog", K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
} else if (OB_FAIL(ls->create_ls(arg.get_tenant_info().get_tenant_role(),
palf_base_info,
arg.get_replica_type(),
unused_allow_log_sync))) {
LOG_WARN("enable ls palf failed", K(ret), K(arg), K(palf_base_info));
// only restore ls does not need enable replay
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
// inner tablet reverted by inner_del_ls_ if fail to create
// only restore ls with base will not need create inner tablet
} else if (need_create_inner_tablets_(arg) &&
OB_FAIL(ls->create_ls_inner_tablet(arg.get_compat_mode(),
arg.get_create_scn()))) {
LOG_WARN("create ls inner tablet failed", K(ret), K(arg));
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id()))) {
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
} else {
state = ObLSCreateState::CREATE_STATE_FINISH;
ls->finish_create(is_commit);
if (OB_FAIL(ls->start())) {
LOG_ERROR("ls start failed", K(ret), K(arg));
} else if (!is_ls_to_restore_(arg) &&
OB_FAIL(ls->enable_replay_without_lock())) {
LOG_WARN("enable ls replay failed", K(ret), K(arg));
} else if (is_ls_to_restore_(arg)) {
if (OB_FAIL(ls->offline_without_lock())) {
LOG_WARN("failed to offline", K(ret), K(arg));
} else if (OB_FAIL(ls->get_log_handler()->enable_sync())) {
LOG_WARN("failed to enable sync", K(ret), K(arg));
} else if (OB_FAIL(ls->get_ls_restore_handler()->online())) {
LOG_WARN("failed to online restore handler", K(ret), K(arg));
}
}
FLOG_INFO("add ls to ls service succ", K(ls->get_ls_id()), K(arg));
if (OB_SUCCESS != (tmp_ret = ls->report_replica_info())) {
LOG_WARN("fail to report ls", KR(tmp_ret), K(arg));
}
}
FLOG_INFO("add ls to ls service succ", K(ls->get_ls_id()), K(arg));
if (OB_SUCCESS != (tmp_ret = ls->report_replica_info())) {
LOG_WARN("fail to report ls", KR(tmp_ret), K(arg));
if (OB_FAIL(ret)) {
do {
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
is_commit = false;
ls->finish_create(is_commit);
if (OB_SUCCESS != (tmp_ret = write_abort_create_ls_slog_(ls->get_ls_id()))) {
need_retry = true;
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
}
}
} while (need_retry);
do {
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
need_retry = true;
LOG_WARN("ls disable palf failed", K(tmp_ret));
}
}
} while (need_retry);
}
}
if (OB_FAIL(ret)) {
do {
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
is_commit = false;
ls->finish_create(is_commit);
if (OB_SUCCESS != (tmp_ret = write_abort_create_ls_slog_(ls->get_ls_id()))) {
need_retry = true;
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
}
}
} while (need_retry);
do {
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
need_retry = true;
LOG_WARN("ls disable palf failed", K(tmp_ret));
}
}
} while (need_retry);
del_ls_after_create_ls_failed_(state, ls);
}
}
if (OB_FAIL(ret)) {
del_ls_after_create_ls_failed_(state, ls);
}
FLOG_INFO("create_ls finish", K(ret), K(arg));
FLOG_INFO("create_ls finish", K(ret), K(arg), K(abs_timeout_ts));
return ret;
}
@ -876,10 +886,9 @@ int ObLSService::remove_ls(
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t abs_timeout_ts = INT64_MAX;
ObLSHandle handle;
ObLS *ls = NULL;
lib::ObMutexGuard change_guard(change_lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -892,68 +901,77 @@ int ObLSService::remove_ls(
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id));
} else if (OB_FAIL(get_ls(ls_id, handle, ObLSGetMod::TXSTORAGE_MOD))) {
if (ret == OB_LS_NOT_EXIST) {
ret = OB_SUCCESS;
} else {
LOG_WARN("get log stream failed", K(ret), K(ls_id));
}
} else if (OB_ISNULL(ls = handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log stream is null, unexpected error", K(ls_id));
// ls leader gc must has block tx start, gracefully kill tx and write offline log before here.
} else if (OB_FAIL(ls->offline())) {
LOG_WARN("ls offline failed", K(ret), K(ls_id), KP(ls));
} else if (OB_FAIL(ls->stop())) {
LOG_WARN("stop ls failed", K(ret), KP(ls), K(ls_id));
} else if (FALSE_IT(ls->wait())) {
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
abs_timeout_ts))) {
LOG_WARN("get timeout ts failed", KR(ret));
} else {
ObLSSafeDestroyTask *task = nullptr;
static const int64_t SLEEP_TS = 100_ms;
ObLSLockGuard lock_ls(ls);
if (OB_ISNULL(task = (ObLSSafeDestroyTask*)ob_malloc(sizeof(ObLSSafeDestroyTask),
"LSSafeDestroy"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (FALSE_IT(task = new(task) ObLSSafeDestroyTask())) {
} else if (FALSE_IT(ls->set_create_state(ObInnerLSStatus::REMOVED))) {
// set ls to remove state and prevent slog write
} else if(!is_replay &&
OB_FAIL(write_remove_ls_slog_(ls_id))) {
LOG_WARN("fail to write remove ls slog", K(ret));
} else if (OB_FAIL(task->init(MTL_ID(),
handle,
this))) {
LOG_WARN("init safe destroy task failed", K(ret));
ObMutexGuardWithTimeout change_guard(change_lock_, abs_timeout_ts);
if (OB_FAIL(change_guard.get_ret())) {
LOG_WARN("lock failed, try again later", K(ret));
ret = OB_EAGAIN;
} else if (OB_FAIL(get_ls(ls_id, handle, ObLSGetMod::TXSTORAGE_MOD))) {
if (ret == OB_LS_NOT_EXIST) {
ret = OB_SUCCESS;
} else {
LOG_WARN("get log stream failed", K(ret), K(ls_id));
}
} else if (OB_ISNULL(ls = handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log stream is null, unexpected error", K(ls_id));
// ls leader gc must has block tx start, gracefully kill tx and write offline log before here.
} else if (OB_FAIL(ls->offline())) {
LOG_WARN("ls offline failed", K(ret), K(ls_id), KP(ls));
} else if (OB_FAIL(ls->stop())) {
LOG_WARN("stop ls failed", K(ret), KP(ls), K(ls_id));
} else if (FALSE_IT(ls->wait())) {
} else {
remove_ls_(ls);
// try until success.
while (OB_FAIL(SAFE_DESTROY_INSTANCE.push(*task))) {
if (REACH_TIME_INTERVAL(1_min)) { // every minute
LOG_WARN("add safe destroy task failed, retry", K(ret), KPC(task));
ObLSSafeDestroyTask *task = nullptr;
static const int64_t SLEEP_TS = 100_ms;
ObLSLockGuard lock_ls(ls);
if (OB_ISNULL(task = (ObLSSafeDestroyTask*)ob_malloc(sizeof(ObLSSafeDestroyTask),
"LSSafeDestroy"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (FALSE_IT(task = new(task) ObLSSafeDestroyTask())) {
} else if (FALSE_IT(ls->set_create_state(ObInnerLSStatus::REMOVED))) {
// set ls to remove state and prevent slog write
} else if(!is_replay &&
OB_FAIL(write_remove_ls_slog_(ls_id))) {
LOG_WARN("fail to write remove ls slog", K(ret));
} else if (OB_FAIL(task->init(MTL_ID(),
handle,
this))) {
LOG_WARN("init safe destroy task failed", K(ret));
} else {
remove_ls_(ls);
// try until success.
while (OB_FAIL(SAFE_DESTROY_INSTANCE.push(*task))) {
if (REACH_TIME_INTERVAL(1_min)) { // every minute
LOG_WARN("add safe destroy task failed, retry", K(ret), KPC(task));
}
ob_usleep(SLEEP_TS);
}
ob_usleep(SLEEP_TS);
}
if (OB_FAIL(ret) && OB_NOT_NULL(task)) {
task->~ObLSSafeDestroyTask();
ob_free(task);
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(task)) {
task->~ObLSSafeDestroyTask();
ob_free(task);
}
}
// report after remove
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_ISNULL(rs_reporter_)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("rs_reporter_ is null", KR(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = rs_reporter_->submit_ls_update_task(tenant_id_, ls_id))) {
LOG_WARN("fail to submit_ls_update_task", KR(tmp_ret), K_(tenant_id), K(ls_id));
} else {
LOG_INFO("submit ls update task after remove_ls success", K(ls_id), K_(tenant_id));
// report after remove
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_ISNULL(rs_reporter_)) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("rs_reporter_ is null", KR(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = rs_reporter_->submit_ls_update_task(tenant_id_, ls_id))) {
LOG_WARN("fail to submit_ls_update_task", KR(tmp_ret), K_(tenant_id), K(ls_id));
} else {
LOG_INFO("submit ls update task after remove_ls success", K(ls_id), K_(tenant_id));
}
}
}
FLOG_INFO("remove_ls finish", K(ret), K(ls_id), KPC(ls), K(is_replay));
FLOG_INFO("remove_ls finish", K(ret), K(ls_id), KPC(ls), K(is_replay), K(abs_timeout_ts));
return ret;
}
@ -985,6 +1003,7 @@ int ObLSService::create_ls_for_ha(
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t abs_timeout_ts = INT64_MAX;
ObLSCreateState state = ObLSCreateState::CREATE_STATE_INIT;
ObLS *ls = NULL;
ObLSMeta ls_meta;
@ -995,7 +1014,6 @@ int ObLSService::create_ls_for_ha(
ObLSRestoreStatus restore_status = ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE);
bool ls_exist = false;
bool waiting_destroy = false;
lib::ObMutexGuard change_guard(change_lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -1012,90 +1030,98 @@ int ObLSService::create_ls_for_ha(
} else if (ObMigrationOpType::MIGRATE_LS_OP != arg.type_ && ObMigrationOpType::ADD_LS_OP != arg.type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("create ls for migration get unexpected op type", K(ret), K(task_id), K(arg));
} else if (OB_FAIL(check_ls_exist(arg.ls_id_,
ls_exist))) {
LOG_WARN("check ls exist failed", K(ret), K(arg));
} else if (ls_exist) {
ret = OB_ENTRY_EXIST;
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg));
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.ls_id_,
waiting_destroy))) {
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
} else if (waiting_destroy) {
ret = OB_EAGAIN;
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
} else if (OB_FAIL(get_restore_status_(restore_status))) {
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
} else if (OB_FAIL(inner_create_ls_(arg.ls_id_,
migration_status,
restore_status,
ObScnRange::MIN_SCN, /* create scn */
ls))) {
LOG_WARN("create ls failed", K(ret), K(arg), K(task_id));
} else if (OB_FAIL(ObShareUtil::get_abs_timeout(DEFAULT_LOCK_TIMEOUT /* default timeout */,
abs_timeout_ts))) {
LOG_WARN("get timeout ts failed", KR(ret));
} else {
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
palf::PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
ObLSLockGuard lock_ls(ls);
// TODO by yunlong: to remove allow_log_sync flag
const bool allow_log_sync = false;
ls_meta = ls->get_ls_meta();
if (OB_FAIL(add_ls_to_map_(ls))) {
LOG_WARN("add log stream to map failed.", K(ret));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
// do nothing
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
LOG_WARN("fail to write create log stream slog", K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
} else if (OB_FAIL(ls->create_ls(share::RESTORE_TENANT_ROLE,
palf_base_info,
arg.dst_.get_replica_type(),
allow_log_sync))) {
LOG_WARN("enable ls palf failed", K(ret), K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id()))) {
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
ObMutexGuardWithTimeout change_guard(change_lock_, abs_timeout_ts);
if (OB_FAIL(change_guard.get_ret())) {
LOG_WARN("lock failed, try again later", K(ret));
ret = OB_EAGAIN;
} else if (OB_FAIL(check_ls_exist(arg.ls_id_, ls_exist))) {
LOG_WARN("check ls exist failed", K(ret), K(arg));
} else if (ls_exist) {
ret = OB_ENTRY_EXIST;
LOG_WARN("ls exist, cannot create ls now", K(ret), K(arg));
} else if (OB_FAIL(check_ls_waiting_safe_destroy(arg.ls_id_,
waiting_destroy))) {
LOG_WARN("check ls waiting safe destroy failed", K(ret), K(arg));
} else if (waiting_destroy) {
ret = OB_EAGAIN;
LOG_WARN("ls waiting for destroy, need retry later", K(ret), K(arg));
} else if (OB_FAIL(ObMigrationStatusHelper::trans_migration_op(arg.type_, migration_status))) {
LOG_WARN("failed to trans migration op", K(ret), K(arg), K(task_id));
} else if (OB_FAIL(get_restore_status_(restore_status))) {
LOG_WARN("failed to get restore status", K(ret), K(arg), K(task_id));
} else if (OB_FAIL(inner_create_ls_(arg.ls_id_,
migration_status,
restore_status,
ObScnRange::MIN_SCN, /* create scn */
ls))) {
LOG_WARN("create ls failed", K(ret), K(arg), K(task_id));
} else {
state = ObLSCreateState::CREATE_STATE_FINISH;
ls->finish_create(is_commit);
if (OB_FAIL(ls->start())) {
LOG_ERROR("ls start failed", K(tmp_ret), K(ls_meta));
} else if (OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(task_id, arg))) {
LOG_WARN("failed to add ls migration task", K(ret), K(arg));
state = ObLSCreateState::CREATE_STATE_INNER_CREATED;
palf::PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
ObLSLockGuard lock_ls(ls);
// TODO by yunlong: to remove allow_log_sync flag
const bool allow_log_sync = false;
ls_meta = ls->get_ls_meta();
if (OB_FAIL(add_ls_to_map_(ls))) {
LOG_WARN("add log stream to map failed.", K(ret));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_ADDED_TO_MAP)) {
// do nothing
} else if (OB_FAIL(write_prepare_create_ls_slog_(ls_meta))) {
LOG_WARN("fail to write create log stream slog", K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG)) {
} else if (OB_FAIL(ls->create_ls(share::RESTORE_TENANT_ROLE,
palf_base_info,
arg.dst_.get_replica_type(),
allow_log_sync))) {
LOG_WARN("enable ls palf failed", K(ret), K(ls_meta));
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_PALF_ENABLED)) {
} else if (OB_FAIL(write_commit_create_ls_slog_(ls->get_ls_id()))) {
LOG_WARN("fail to write create log stream commit slog", K(ret), K(ls_meta));
} else {
FLOG_INFO("add ls to ls service succ", K(ls->get_ls_id()), K(ls_meta));
state = ObLSCreateState::CREATE_STATE_FINISH;
ls->finish_create(is_commit);
if (OB_FAIL(ls->start())) {
LOG_ERROR("ls start failed", K(tmp_ret), K(ls_meta));
} else if (OB_FAIL(ls->get_ls_migration_handler()->add_ls_migration_task(task_id, arg))) {
LOG_WARN("failed to add ls migration task", K(ret), K(arg));
} else {
FLOG_INFO("add ls to ls service succ", K(ls->get_ls_id()), K(ls_meta));
}
}
if (OB_FAIL(ret)) {
do {
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
is_commit = false;
ls->finish_create(is_commit);
if (OB_SUCCESS != (tmp_ret = write_abort_create_ls_slog_(ls->get_ls_id()))) {
need_retry = true;
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
}
}
} while (need_retry);
do {
need_retry = false;
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
need_retry = true;
LOG_WARN("ls disable palf failed", K(tmp_ret));
}
}
} while (need_retry);
}
}
if (OB_FAIL(ret)) {
do {
need_retry = false;
if (state >= ObLSCreateState::CREATE_STATE_WRITE_PREPARE_SLOG) {
is_commit = false;
ls->finish_create(is_commit);
if (OB_SUCCESS != (tmp_ret = write_abort_create_ls_slog_(ls->get_ls_id()))) {
need_retry = true;
LOG_ERROR("fail to write create log stream abort slog", K(tmp_ret), K(ls_meta));
}
}
} while (need_retry);
do {
need_retry = false;
// TODO: yanyuan.cxf every remove disable or stop function need be re-entrant
if (state >= ObLSCreateState::CREATE_STATE_PALF_ENABLED) {
if (OB_SUCCESS != (tmp_ret = ls->remove_ls())) {
need_retry = true;
LOG_WARN("ls disable palf failed", K(tmp_ret));
}
}
} while (need_retry);
del_ls_after_create_ls_failed_(state, ls);
}
}
if (OB_FAIL(ret)) {
del_ls_after_create_ls_failed_(state, ls);
}
FLOG_INFO("create_ls for ha finish", K(ret), K(ls_meta));
FLOG_INFO("create_ls for ha finish", K(ret), K(ls_meta), K(abs_timeout_ts));
return ret;
}

View File

@ -46,6 +46,7 @@ struct ObLSMeta;
// Support log stream meta persistent and checkpoint
class ObLSService
{
static const int64_t DEFAULT_LOCK_TIMEOUT = 60_s;
public:
ObLSService();
~ObLSService();