some fixes for tenant cloning
This commit is contained in:
parent
68dec0b1d6
commit
bf284d0696
@ -620,6 +620,7 @@ int ObCloneScheduler::clone_create_tenant(const share::ObCloneJob &job)
|
||||
LOG_WARN("fail to update clone tenant id", KR(ret), K(max_id), K(job));
|
||||
} else {
|
||||
clone_tenant_id = max_id;
|
||||
FLOG_INFO("fetch clone_tenant_id success", K(clone_tenant_id), K(job));
|
||||
}
|
||||
}
|
||||
|
||||
@ -642,6 +643,8 @@ ERRSIM_POINT_DEF(ERRSIM_CLONE_WAIT_CREATE_TENANT_ERROR);
|
||||
int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool user_finished = false;
|
||||
|
||||
ObTenantCloneTableOperator clone_op;
|
||||
ObCloneJob user_job_history;
|
||||
const uint64_t clone_tenant_id = job.get_clone_tenant_id();
|
||||
@ -690,11 +693,22 @@ int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job)
|
||||
LOG_WARN("tenant status not match", KR(ret), K(tenant_schema));
|
||||
}
|
||||
} else {
|
||||
user_finished = true;
|
||||
int user_ret_code = user_job_history.get_ret_code();
|
||||
ret = OB_SUCCESS == user_ret_code ? OB_ERR_CLONE_TENANT : user_ret_code;
|
||||
if (OB_SUCCESS == user_ret_code) {
|
||||
ret = OB_ERR_CLONE_TENANT;
|
||||
LOG_WARN("user job is not in success status, but it's ret_code is OB_SUCCESS",
|
||||
KR(ret), K(user_job_history));
|
||||
} else {
|
||||
ret = user_ret_code;
|
||||
}
|
||||
LOG_WARN("user_job_history status is not SUCCESS", KR(ret), K(user_job_history));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) && !user_finished) {
|
||||
need_wait = true;
|
||||
}
|
||||
|
||||
if (!need_wait) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(try_update_job_status_(ret, job))) {
|
||||
@ -1481,48 +1495,6 @@ int ObCloneScheduler::check_meta_tenant_(const uint64_t tenant_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCloneScheduler::wait_source_relative_task_finished_(
|
||||
const uint64_t source_tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLTransaction trans;
|
||||
const int64_t timeout = DEFAULT_TIMEOUT;
|
||||
observer::ObInnerSQLConnection *conn = NULL;
|
||||
|
||||
if (!is_user_tenant(source_tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(source_tenant_id));
|
||||
} else if (OB_FAIL(trans.start(sql_proxy_, source_tenant_id))) {
|
||||
LOG_WARN("failed to start trans in user tenant", KR(ret), K(source_tenant_id));
|
||||
} else if (OB_ISNULL(conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("conn_ is NULL", KR(ret));
|
||||
//TODO: __all_unit is not in white list
|
||||
} else if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(source_tenant_id,
|
||||
OB_ALL_UNIT_TID,
|
||||
EXCLUSIVE,
|
||||
timeout,
|
||||
conn))) {
|
||||
LOG_WARN("lock dest table failed", KR(ret), K(source_tenant_id));
|
||||
}
|
||||
|
||||
/*If we successfully lock, we can assure that there are currently
|
||||
no related tasks being executed, so we can roll back the lock.
|
||||
Afterwards, new related tasks will not be allowed to be executed
|
||||
because we previously locked the record GLOBAL_STATE (snapshot_id = 0)
|
||||
in the internal table __all_tenant_snapshot */
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
//rollback
|
||||
if (OB_TMP_FAIL(trans.end(false))) {
|
||||
LOG_WARN("trans abort failed", KR(tmp_ret));
|
||||
ret = (OB_SUCC(ret)) ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCloneScheduler::fill_clone_resource_pool_arg_(
|
||||
const share::ObCloneJob &job,
|
||||
const uint64_t resource_pool_id,
|
||||
|
@ -968,25 +968,6 @@ int ObTenantSnapshotUtil::check_and_get_data_version(const uint64_t tenant_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantSnapshotUtil::get_sys_ls_info(common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const ObTenantSnapshotID &tenant_snapshot_id,
|
||||
ObArray<ObTenantSnapLSReplicaSimpleItem> &simple_items)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
simple_items.reset();
|
||||
ObTenantSnapshotTableOperator snap_op;
|
||||
if (OB_UNLIKELY(!is_user_tenant(tenant_id) || !tenant_snapshot_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tenant_snapshot_id));
|
||||
} else if (OB_FAIL(snap_op.init(tenant_id, &sql_client))) {
|
||||
LOG_WARN("failed to init table op", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(snap_op.get_tenant_snap_ls_replica_simple_items(tenant_snapshot_id, SYS_LS, simple_items))) {
|
||||
LOG_WARN("failed to get sys ls replica simple items", KR(ret), K(tenant_snapshot_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantSnapshotUtil::check_tenant_has_snapshot(common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
bool &has_snapshot)
|
||||
|
@ -149,10 +149,6 @@ public:
|
||||
share::ObTenantSnapshotID &tenant_snapshot_id);
|
||||
static int check_and_get_data_version(const uint64_t tenant_id,
|
||||
uint64_t &data_version);
|
||||
static int get_sys_ls_info(common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const share::ObTenantSnapshotID &tenant_snapshot_id,
|
||||
ObArray<share::ObTenantSnapLSReplicaSimpleItem> &simple_items);
|
||||
static int check_tenant_has_snapshot(common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
bool &has_snapshot);
|
||||
|
@ -104,7 +104,7 @@ public:
|
||||
meta_handler_(nullptr),
|
||||
build_ctx_mutex_() {};
|
||||
|
||||
~ObLSSnapshot() {};
|
||||
~ObLSSnapshot() { reset(); };
|
||||
|
||||
int init(const share::ObTenantSnapshotID& tenant_snapshot_id,
|
||||
const ObLSID& ls_id,
|
||||
@ -114,13 +114,15 @@ public:
|
||||
|
||||
void reset()
|
||||
{
|
||||
tenant_snapshot_id_.reset();
|
||||
ls_id_.reset();
|
||||
meta_existed_ = false;
|
||||
try_free_build_ctx_();
|
||||
build_ctx_allocator_ = nullptr;
|
||||
meta_handler_ = nullptr;
|
||||
is_inited_ = false;
|
||||
if (IS_INIT) {
|
||||
tenant_snapshot_id_.reset();
|
||||
ls_id_.reset();
|
||||
meta_existed_ = false;
|
||||
try_free_build_ctx_();
|
||||
build_ctx_allocator_ = nullptr;
|
||||
meta_handler_ = nullptr;
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
ObLSSnapshot &operator=(const ObLSSnapshot &other) = delete;
|
||||
|
@ -69,12 +69,32 @@ void ObTenantCloneService::stop()
|
||||
|
||||
void ObTenantCloneService::wait()
|
||||
{
|
||||
startup_accel_handler_.wait();
|
||||
int ret = OB_SUCCESS;
|
||||
while(OB_FAIL(wait_())) {
|
||||
usleep(100000);
|
||||
}
|
||||
}
|
||||
|
||||
int ObTenantCloneService::wait_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_started_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObTenantCloneService is running when wait function is called", KR(ret), KPC(this));
|
||||
stop();
|
||||
} else {
|
||||
startup_accel_handler_.wait();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenantCloneService::destroy()
|
||||
{
|
||||
startup_accel_handler_.destroy();
|
||||
if (IS_INIT) {
|
||||
startup_accel_handler_.destroy();
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
int ObTenantCloneService::get_clone_job_(ObArray<ObCloneJob>& clone_jobs)
|
||||
|
@ -51,7 +51,10 @@ public:
|
||||
void run();
|
||||
|
||||
bool is_started() { return is_started_; }
|
||||
|
||||
TO_STRING_KV(K(is_inited_), K(is_started_), KP(meta_handler_));
|
||||
private:
|
||||
int wait_();
|
||||
int get_clone_job_(ObArray<ObCloneJob>& clone_jobs);
|
||||
int try_clone_(const ObCloneJob& job);
|
||||
void try_clone_one_ls_(const ObCloneJob& job, ObLS* ls);
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
meta_handler_(nullptr),
|
||||
mutex_() {}
|
||||
|
||||
~ObTenantSnapshot() {}
|
||||
~ObTenantSnapshot() { reset(); }
|
||||
int destroy();
|
||||
|
||||
int init(const share::ObTenantSnapshotID& tenant_snapshot_id,
|
||||
@ -55,15 +55,17 @@ public:
|
||||
|
||||
void reset()
|
||||
{
|
||||
is_inited_ = false;
|
||||
is_running_ = false;
|
||||
tenant_snapshot_id_.reset();
|
||||
has_unfinished_create_dag_ = false;
|
||||
has_unfinished_gc_dag_ = false;
|
||||
clone_ref_ = 0;
|
||||
meta_existed_ = false;
|
||||
ls_snapshot_mgr_ = nullptr;
|
||||
meta_handler_ = nullptr;
|
||||
if (IS_INIT) {
|
||||
is_running_ = false;
|
||||
tenant_snapshot_id_.reset();
|
||||
has_unfinished_create_dag_ = false;
|
||||
has_unfinished_gc_dag_ = false;
|
||||
clone_ref_ = 0;
|
||||
meta_existed_ = false;
|
||||
ls_snapshot_mgr_ = nullptr;
|
||||
meta_handler_ = nullptr;
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
|
@ -95,8 +95,14 @@ void ObTenantSnapshotService::destroy()
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
TG_DESTROY(tg_id_);
|
||||
tg_id_ = INT32_MAX;
|
||||
clone_service_.destroy();
|
||||
tenant_snapshot_mgr_.destroy();
|
||||
ls_snapshot_mgr_.destroy();
|
||||
meta_loaded_ = false;
|
||||
unit_is_deleting_ = false;
|
||||
cond_.destroy();
|
||||
running_mode_ = RUNNING_MODE::INVALID;
|
||||
is_inited_ = false;
|
||||
}
|
||||
LOG_INFO("ObTenantSnapshotService::destroy end", KR(ret));
|
||||
@ -770,6 +776,7 @@ void ObTenantSnapshotService::run1()
|
||||
if (OB_SUCC(ret) && NORMAL == running_mode_) {
|
||||
if (clone_service_.is_started()) {
|
||||
clone_service_.stop();
|
||||
clone_service_.wait();
|
||||
}
|
||||
|
||||
run_in_normal_mode_();
|
||||
|
Loading…
x
Reference in New Issue
Block a user