some fixes for tenant cloning

This commit is contained in:
obdev 2023-12-24 14:43:05 +00:00 committed by ob-robot
parent bd53d340c0
commit f582ac5878
8 changed files with 69 additions and 86 deletions

View File

@ -620,6 +620,7 @@ int ObCloneScheduler::clone_create_tenant(const share::ObCloneJob &job)
LOG_WARN("fail to update clone tenant id", KR(ret), K(max_id), K(job)); LOG_WARN("fail to update clone tenant id", KR(ret), K(max_id), K(job));
} else { } else {
clone_tenant_id = max_id; clone_tenant_id = max_id;
FLOG_INFO("fetch clone_tenant_id success", K(clone_tenant_id), K(job));
} }
} }
@ -642,6 +643,8 @@ ERRSIM_POINT_DEF(ERRSIM_CLONE_WAIT_CREATE_TENANT_ERROR);
int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job) int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool user_finished = false;
ObTenantCloneTableOperator clone_op; ObTenantCloneTableOperator clone_op;
ObCloneJob user_job_history; ObCloneJob user_job_history;
const uint64_t clone_tenant_id = job.get_clone_tenant_id(); const uint64_t clone_tenant_id = job.get_clone_tenant_id();
@ -690,11 +693,22 @@ int ObCloneScheduler::clone_wait_tenant_restore_finish(const ObCloneJob &job)
LOG_WARN("tenant status not match", KR(ret), K(tenant_schema)); LOG_WARN("tenant status not match", KR(ret), K(tenant_schema));
} }
} else { } else {
user_finished = true;
int user_ret_code = user_job_history.get_ret_code(); int user_ret_code = user_job_history.get_ret_code();
ret = OB_SUCCESS == user_ret_code ? OB_ERR_CLONE_TENANT : user_ret_code; if (OB_SUCCESS == user_ret_code) {
ret = OB_ERR_CLONE_TENANT;
LOG_WARN("user job is not in success status, but it's ret_code is OB_SUCCESS",
KR(ret), K(user_job_history));
} else {
ret = user_ret_code;
}
LOG_WARN("user_job_history status is not SUCCESS", KR(ret), K(user_job_history)); LOG_WARN("user_job_history status is not SUCCESS", KR(ret), K(user_job_history));
} }
if (OB_FAIL(ret) && !user_finished) {
need_wait = true;
}
if (!need_wait) { if (!need_wait) {
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(try_update_job_status_(ret, job))) { if (OB_TMP_FAIL(try_update_job_status_(ret, job))) {
@ -1481,48 +1495,6 @@ int ObCloneScheduler::check_meta_tenant_(const uint64_t tenant_id)
return ret; return ret;
} }
int ObCloneScheduler::wait_source_relative_task_finished_(
const uint64_t source_tenant_id)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
const int64_t timeout = DEFAULT_TIMEOUT;
observer::ObInnerSQLConnection *conn = NULL;
if (!is_user_tenant(source_tenant_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(source_tenant_id));
} else if (OB_FAIL(trans.start(sql_proxy_, source_tenant_id))) {
LOG_WARN("failed to start trans in user tenant", KR(ret), K(source_tenant_id));
} else if (OB_ISNULL(conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("conn_ is NULL", KR(ret));
//TODO: __all_unit is not in white list
} else if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(source_tenant_id,
OB_ALL_UNIT_TID,
EXCLUSIVE,
timeout,
conn))) {
LOG_WARN("lock dest table failed", KR(ret), K(source_tenant_id));
}
/*If we successfully lock, we can assure that there are currently
no related tasks being executed, so we can roll back the lock.
Afterwards, new related tasks will not be allowed to be executed
because we previously locked the record GLOBAL_STATE (snapshot_id = 0)
in the internal table __all_tenant_snapshot */
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
//rollback
if (OB_TMP_FAIL(trans.end(false))) {
LOG_WARN("trans abort failed", KR(tmp_ret));
ret = (OB_SUCC(ret)) ? tmp_ret : ret;
}
}
return ret;
}
int ObCloneScheduler::fill_clone_resource_pool_arg_( int ObCloneScheduler::fill_clone_resource_pool_arg_(
const share::ObCloneJob &job, const share::ObCloneJob &job,
const uint64_t resource_pool_id, const uint64_t resource_pool_id,

View File

@ -968,25 +968,6 @@ int ObTenantSnapshotUtil::check_and_get_data_version(const uint64_t tenant_id,
return ret; return ret;
} }
int ObTenantSnapshotUtil::get_sys_ls_info(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObTenantSnapshotID &tenant_snapshot_id,
ObArray<ObTenantSnapLSReplicaSimpleItem> &simple_items)
{
int ret = OB_SUCCESS;
simple_items.reset();
ObTenantSnapshotTableOperator snap_op;
if (OB_UNLIKELY(!is_user_tenant(tenant_id) || !tenant_snapshot_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tenant_snapshot_id));
} else if (OB_FAIL(snap_op.init(tenant_id, &sql_client))) {
LOG_WARN("failed to init table op", KR(ret), K(tenant_id));
} else if (OB_FAIL(snap_op.get_tenant_snap_ls_replica_simple_items(tenant_snapshot_id, SYS_LS, simple_items))) {
LOG_WARN("failed to get sys ls replica simple items", KR(ret), K(tenant_snapshot_id));
}
return ret;
}
int ObTenantSnapshotUtil::check_tenant_has_snapshot(common::ObISQLClient &sql_client, int ObTenantSnapshotUtil::check_tenant_has_snapshot(common::ObISQLClient &sql_client,
const uint64_t tenant_id, const uint64_t tenant_id,
bool &has_snapshot) bool &has_snapshot)

View File

@ -149,10 +149,6 @@ public:
share::ObTenantSnapshotID &tenant_snapshot_id); share::ObTenantSnapshotID &tenant_snapshot_id);
static int check_and_get_data_version(const uint64_t tenant_id, static int check_and_get_data_version(const uint64_t tenant_id,
uint64_t &data_version); uint64_t &data_version);
static int get_sys_ls_info(common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const share::ObTenantSnapshotID &tenant_snapshot_id,
ObArray<share::ObTenantSnapLSReplicaSimpleItem> &simple_items);
static int check_tenant_has_snapshot(common::ObISQLClient &sql_client, static int check_tenant_has_snapshot(common::ObISQLClient &sql_client,
const uint64_t tenant_id, const uint64_t tenant_id,
bool &has_snapshot); bool &has_snapshot);

View File

@ -104,7 +104,7 @@ public:
meta_handler_(nullptr), meta_handler_(nullptr),
build_ctx_mutex_() {}; build_ctx_mutex_() {};
~ObLSSnapshot() {}; ~ObLSSnapshot() { reset(); };
int init(const share::ObTenantSnapshotID& tenant_snapshot_id, int init(const share::ObTenantSnapshotID& tenant_snapshot_id,
const ObLSID& ls_id, const ObLSID& ls_id,
@ -114,13 +114,15 @@ public:
void reset() void reset()
{ {
tenant_snapshot_id_.reset(); if (IS_INIT) {
ls_id_.reset(); tenant_snapshot_id_.reset();
meta_existed_ = false; ls_id_.reset();
try_free_build_ctx_(); meta_existed_ = false;
build_ctx_allocator_ = nullptr; try_free_build_ctx_();
meta_handler_ = nullptr; build_ctx_allocator_ = nullptr;
is_inited_ = false; meta_handler_ = nullptr;
is_inited_ = false;
}
} }
ObLSSnapshot &operator=(const ObLSSnapshot &other) = delete; ObLSSnapshot &operator=(const ObLSSnapshot &other) = delete;

View File

@ -69,12 +69,32 @@ void ObTenantCloneService::stop()
void ObTenantCloneService::wait() void ObTenantCloneService::wait()
{ {
startup_accel_handler_.wait(); int ret = OB_SUCCESS;
while(OB_FAIL(wait_())) {
usleep(100000);
}
}
int ObTenantCloneService::wait_()
{
int ret = OB_SUCCESS;
if (is_started_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ObTenantCloneService is running when wait function is called", KR(ret), KPC(this));
stop();
} else {
startup_accel_handler_.wait();
}
return ret;
} }
void ObTenantCloneService::destroy() void ObTenantCloneService::destroy()
{ {
startup_accel_handler_.destroy(); if (IS_INIT) {
startup_accel_handler_.destroy();
is_inited_ = false;
}
} }
int ObTenantCloneService::get_clone_job_(ObArray<ObCloneJob>& clone_jobs) int ObTenantCloneService::get_clone_job_(ObArray<ObCloneJob>& clone_jobs)

View File

@ -51,7 +51,10 @@ public:
void run(); void run();
bool is_started() { return is_started_; } bool is_started() { return is_started_; }
TO_STRING_KV(K(is_inited_), K(is_started_), KP(meta_handler_));
private: private:
int wait_();
int get_clone_job_(ObArray<ObCloneJob>& clone_jobs); int get_clone_job_(ObArray<ObCloneJob>& clone_jobs);
int try_clone_(const ObCloneJob& job); int try_clone_(const ObCloneJob& job);
void try_clone_one_ls_(const ObCloneJob& job, ObLS* ls); void try_clone_one_ls_(const ObCloneJob& job, ObLS* ls);

View File

@ -46,7 +46,7 @@ public:
meta_handler_(nullptr), meta_handler_(nullptr),
mutex_() {} mutex_() {}
~ObTenantSnapshot() {} ~ObTenantSnapshot() { reset(); }
int destroy(); int destroy();
int init(const share::ObTenantSnapshotID& tenant_snapshot_id, int init(const share::ObTenantSnapshotID& tenant_snapshot_id,
@ -55,15 +55,17 @@ public:
void reset() void reset()
{ {
is_inited_ = false; if (IS_INIT) {
is_running_ = false; is_running_ = false;
tenant_snapshot_id_.reset(); tenant_snapshot_id_.reset();
has_unfinished_create_dag_ = false; has_unfinished_create_dag_ = false;
has_unfinished_gc_dag_ = false; has_unfinished_gc_dag_ = false;
clone_ref_ = 0; clone_ref_ = 0;
meta_existed_ = false; meta_existed_ = false;
ls_snapshot_mgr_ = nullptr; ls_snapshot_mgr_ = nullptr;
meta_handler_ = nullptr; meta_handler_ = nullptr;
is_inited_ = false;
}
} }
public: public:

View File

@ -95,8 +95,14 @@ void ObTenantSnapshotService::destroy()
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
TG_DESTROY(tg_id_); TG_DESTROY(tg_id_);
tg_id_ = INT32_MAX;
clone_service_.destroy();
tenant_snapshot_mgr_.destroy(); tenant_snapshot_mgr_.destroy();
ls_snapshot_mgr_.destroy(); ls_snapshot_mgr_.destroy();
meta_loaded_ = false;
unit_is_deleting_ = false;
cond_.destroy();
running_mode_ = RUNNING_MODE::INVALID;
is_inited_ = false; is_inited_ = false;
} }
LOG_INFO("ObTenantSnapshotService::destroy end", KR(ret)); LOG_INFO("ObTenantSnapshotService::destroy end", KR(ret));
@ -770,6 +776,7 @@ void ObTenantSnapshotService::run1()
if (OB_SUCC(ret) && NORMAL == running_mode_) { if (OB_SUCC(ret) && NORMAL == running_mode_) {
if (clone_service_.is_started()) { if (clone_service_.is_started()) {
clone_service_.stop(); clone_service_.stop();
clone_service_.wait();
} }
run_in_normal_mode_(); run_in_normal_mode_();