Fix migrating ls can't gc while tenant drop

This commit is contained in:
JunkoPF 2025-01-01 12:15:43 +00:00 committed by ob-robot
parent 69cfd4405e
commit cd4ab9e1d5
7 changed files with 199 additions and 28 deletions

View File

@ -682,6 +682,12 @@ class ObString;
ACT(BEFORE_MIGRATION_CREATE_TABLE_STORE,)\
ACT(BEFORE_FILL_AUTO_SPLIT_PARAMS,)\
ACT(BEFORE_UPDATE_TABLET_HA_STATUS,)\
ACT(BEFORE_MIGRATION_DO_INIT_STATUS,)\
ACT(BEFORE_MIGRATION_DO_PREPARE_LS_STATUS,)\
ACT(BEFORE_MIGRATION_DO_BUILD_LS_STATUS,)\
ACT(BEFORE_ADD_PREPARE_LS_MIGRATION_DAG_NET,)\
ACT(BEFORE_ADD_BUILD_LS_MIGRATION_DAG_NET,)\
ACT(BEFORE_ADD_COMPLETE_LS_MIGRATION_DAG_NET,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -65,6 +65,7 @@ namespace lib
namespace share
{
ERRSIM_POINT_DEF(EN_SKIP_LOOP_BLOCKING_DAG);
#define DEFINE_TASK_ADD_KV(n) \
template <LOG_TYPENAME_TN##n> \
@ -3813,6 +3814,11 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
if (OB_ISNULL(scheduler_)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "scheduler is null", KP(scheduler_));
#ifdef ERRSIM
} else if (EN_SKIP_LOOP_BLOCKING_DAG != OB_SUCCESS) {
ret = EN_SKIP_LOOP_BLOCKING_DAG;
COMMON_LOG(WARN, "[ERRSIM] skip loop blocking dag net list", K(ret));
#endif
} else {
ObMutexGuard guard(dag_net_map_lock_);
ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header();
@ -3823,16 +3829,13 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt));
tmp = cur;
cur = cur->get_next();
if (tmp->is_cancel()) {
(void) finish_dag_net_without_lock(*tmp);
(void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp);
(void) scheduler_->free_dag_net(tmp); // set tmp nullptr
} else if (OB_TMP_FAIL(tmp->start_running())) {
// If start running failed, need call clear_dag_net_ctx() to release some resources.
if (tmp->is_cancel() || OB_TMP_FAIL(tmp->start_running())) {
// If dag net has been cancelled or failed to start running, move dag net to finished list.
// Function clear_dag_net_ctx() will be called to release some resources after being scheduled at finish list.
// Move this dag net from blocking to finished list to avoid dead lock.
(void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp);
(void) add_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, tmp);
COMMON_LOG(WARN, "failed to start running, move to finished list", K(tmp_ret), KPC(tmp));
COMMON_LOG(WARN, "dag net has been cancelled or failed to start running, move to finished list", K(tmp_ret), KPC(tmp));
} else {
tmp->set_start_time();
--rest_cnt;

View File

@ -2238,6 +2238,12 @@ int ObWaitDataReadyTask::check_ls_and_task_status_(
} else if (ctx_->is_failed()) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "ls migration task is failed", K(ret), KPC(ctx_));
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("storage_ha", "wait_data_ready_task_cancel",
"tenant_id", ctx_->tenant_id_,
"ls_id", ctx_->arg_.ls_id_.id(),
"ret", ret);
#endif
} else if (ls->is_stopped()) {
ret = OB_NOT_RUNNING;
LOG_WARN("ls is not running, stop migration dag net", K(ret), KPC(ctx_));

View File

@ -716,7 +716,9 @@ int ObInitialMigrationTask::process()
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("storage_ha", "before_prepare_migration_task");
SERVER_EVENT_SYNC_ADD("storage_ha", "before_prepare_migration_task",
"tenant_id", ctx_->tenant_id_,
"ls_id", ctx_->arg_.ls_id_.id());
DEBUG_SYNC(BEFORE_PREPARE_MIGRATION_TASK);
#endif
@ -790,6 +792,12 @@ int ObInitialMigrationTask::generate_migration_dags_()
LOG_WARN("failed to init migration finish dag", K(ret));
} else if (OB_FAIL(this->get_dag()->add_child(*start_migration_dag))) {
LOG_WARN("failed to add start migration dag", K(ret), KPC(start_migration_dag));
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("storage_ha", "initial_migration_task_add_child_failed",
"tenant_id", ctx_->tenant_id_,
"ls_id", ctx_->arg_.ls_id_.id(),
"ret", ret);
#endif
} else if (OB_FAIL(start_migration_dag->create_first_task())) {
LOG_WARN("failed to create first task", K(ret));
} else if (OB_FAIL(start_migration_dag->add_child(*migration_finish_dag))) {

View File

@ -251,6 +251,14 @@ int ObLSMigrationHandler::change_status_(const ObLSMigrationHandlerStatus &new_s
ret = OB_ERR_UNEXPECTED;
LOG_WARN("can not change ls migration handler status", K(ret), K(status_), K(new_status));
} else {
#ifdef ERRSIM
SERVER_EVENT_ADD("storage_ha", "migration_handler_change_status",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id(),
"current_status", status_,
"next_status", new_status,
"result", result_);
#endif
status_ = new_status;
}
}
@ -382,6 +390,75 @@ int ObLSMigrationHandler::check_task_exist_(
return ret;
}
int ObLSMigrationHandler::handle_failed_task_(const ObLSMigrationHandlerStatus &status, bool &need_generate_dag_net)
{
int ret = OB_SUCCESS;
need_generate_dag_net = false;
int32_t result = OB_SUCCESS;
bool is_migration_failed = false;
bool is_exist = false;
ObLSMigrationTask ls_migration_task;
ObTenantDagScheduler *scheduler = nullptr;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ls migration handler do not init", K(ret));
} else if (OB_FAIL(get_result_(result))) {
LOG_WARN("failed to get result", K(ret), K(status));
} else if (FALSE_IT(is_migration_failed = (OB_SUCCESS != result))) {
} else if (OB_FAIL(check_task_exist_(status, is_exist))) {
LOG_WARN("failed to check task exist", K(ret), K(status));
} else if (is_exist) {
if (!is_migration_failed) {
// if dag net is exist and migration is not failed, do nothing
need_generate_dag_net = false;
} else {
// if dag net is exist and migration is failed, cancel the migration task
if (OB_FAIL(cancel_current_task_())) {
LOG_WARN("failed to cancel current task", K(ret), KPC(ls_));
} else {
need_generate_dag_net = false;
}
}
} else {
if (!is_migration_failed) {
// if dag net is not exist and migration is not failed, need to generate dag net
need_generate_dag_net = true;
} else {
// if dag net is not exist and migration is already failed, need to switch stage manually and no need to generate dag net
if (OB_FAIL(switch_next_stage(result))) {
LOG_WARN("failed to switch next stage", K(ret), K(result), KPC(ls_));
} else {
need_generate_dag_net = false;
}
}
}
return ret;
}
int ObLSMigrationHandler::cancel_current_task_() {
int ret = OB_SUCCESS;
ObLSMigrationTask ls_migration_task;
ObTenantDagScheduler *scheduler = nullptr;
if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KPC(ls_));
} else if (OB_FAIL(get_ls_migration_task_(ls_migration_task))) {
LOG_WARN("failed to get ls migration task", K(ret), KPC(ls_));
} else if (!ls_migration_task.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls migration task should not be invalid", K(ret), K(ls_migration_task));
} else if (OB_FAIL(scheduler->cancel_dag_net(ls_migration_task.task_id_))) {
LOG_WARN("failed to cancel dag net", K(ret), K(this), K(ls_migration_task));
} else {
common::SpinWLockGuard guard(lock_);
is_cancel_ = true;
}
return ret;
}
int ObLSMigrationHandler::add_ls_migration_task(
const share::ObTaskId &task_id, const ObMigrationOpArg &arg)
{
@ -568,14 +645,21 @@ int ObLSMigrationHandler::do_init_status_()
int tmp_ret = OB_SUCCESS;
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
bool is_empty = false;
bool can_switch_next_stage = true;
ObLSMigrationHandlerStatus new_status = ObLSMigrationHandlerStatus::MAX_STATUS;
DEBUG_SYNC(BEFORE_MIGRATION_DO_INIT_STATUS);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ls migration handler do not init", K(ret));
} else if (is_migration_failed_()) {
//do nothing
int32_t result = OB_SUCCESS;
if (OB_FAIL(get_result_(result))) {
LOG_WARN("failed to get result", K(ret));
} else {
ret = result;
LOG_INFO("migration is already failed, skip init status", K(result), KPC(ls_));
}
} else if (is_cancel()) {
ret = OB_CANCELED;
LOG_WARN("task is canceled", K(ret));
@ -622,9 +706,7 @@ int ObLSMigrationHandler::do_init_status_()
}
if (OB_FAIL(ret)) {
if (!can_switch_next_stage) {
//do nothing
} else if (OB_SUCCESS != (tmp_ret = switch_next_stage(ret))) {
if (OB_TMP_FAIL(switch_next_stage(ret))) {
LOG_WARN("failed to report result", K(ret), K(tmp_ret));
}
}
@ -635,24 +717,24 @@ int ObLSMigrationHandler::do_prepare_ls_status_()
{
int ret = OB_SUCCESS;
const ObLSMigrationHandlerStatus status = ObLSMigrationHandlerStatus::PREPARE_LS;
bool is_exist = false;
bool need_generate_dag_net = false;
bool can_skip_prepare = false;
int32_t result = OB_SUCCESS;
DEBUG_SYNC(BEFORE_MIGRATION_DO_PREPARE_LS_STATUS);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ls migration handler do not init", K(ret));
} else if (is_migration_failed_()) {
//do nothing
} else if (OB_FAIL(check_can_skip_prepare_status_(can_skip_prepare))) {
LOG_WARN("failed to check can skip prepare status", K(ret));
} else if (can_skip_prepare) {
if (OB_FAIL(switch_next_stage(OB_SUCCESS))) {
LOG_WARN("failed to report result", K(ret), KPC(ls_), K(status));
}
} else if (OB_FAIL(check_task_exist_(status, is_exist))) {
} else if (OB_FAIL(handle_failed_task_(status, need_generate_dag_net))) {
LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_));
} else if (is_exist) {
//do nothing
} else if (!need_generate_dag_net) {
} else if (OB_FAIL(generate_prepare_ls_dag_net_())) {
LOG_WARN("failed to generate prepare ls dag net", K(ret), K(status), KPC(ls_));
}
@ -663,16 +745,16 @@ int ObLSMigrationHandler::do_build_ls_status_()
{
int ret = OB_SUCCESS;
const ObLSMigrationHandlerStatus status = ObLSMigrationHandlerStatus::BUILD_LS;
bool is_exist = false;
bool need_generate_dag_net = false;
DEBUG_SYNC(BEFORE_MIGRATION_DO_BUILD_LS_STATUS);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ls migration handler do not init", K(ret));
} else if (is_migration_failed_()) {
//do nothing
} else if (OB_FAIL(check_task_exist_(status, is_exist))) {
} else if (OB_FAIL(handle_failed_task_(status, need_generate_dag_net))) {
LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_));
} else if (is_exist) {
} else if (!need_generate_dag_net) {
//do nothing
} else if (OB_FAIL(generate_build_ls_dag_net_())) {
LOG_WARN("failed to generate build ls dag net", K(ret), K(status), KPC(ls_));
@ -692,7 +774,11 @@ int ObLSMigrationHandler::do_complete_ls_status_()
} else if (OB_FAIL(check_task_exist_(status, is_exist))) {
LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_));
} else if (is_exist) {
//do nothing
if (is_migration_failed_()) {
if (OB_FAIL(cancel_current_task_())) {
LOG_WARN("failed to cancel current task", K(ret), K(status), KPC(ls_));
}
}
} else if (OB_FAIL(generate_complete_ls_dag_net_())) {
LOG_WARN("failed to generate complete ls dag net", K(ret), K(status), KPC(ls_));
}
@ -788,7 +874,16 @@ int ObLSMigrationHandler::schedule_build_ls_dag_net_(
param.storage_rpc_ = storage_rpc_;
param.svr_rpc_proxy_ = svr_rpc_proxy_;
param.sql_proxy_ = sql_proxy_;
#ifdef ERRSIM
const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id;
const ObLSID errsim_ls_id(errsim_migration_ls_id);
if (ls_->get_ls_id() == errsim_ls_id) {
SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_build_ls_dag_net",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id());
DEBUG_SYNC(BEFORE_ADD_BUILD_LS_MIGRATION_DAG_NET);
}
#endif
if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KP(scheduler));
@ -843,7 +938,16 @@ int ObLSMigrationHandler::schedule_prepare_ls_dag_net_(
ObLSPrepareMigrationParam param;
param.arg_ = task.arg_;
param.task_id_ = task.task_id_;
#ifdef ERRSIM
const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id;
const ObLSID errsim_ls_id(errsim_migration_ls_id);
if (ls_->get_ls_id() == errsim_ls_id) {
SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_prepare_ls_dag_net",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id());
DEBUG_SYNC(BEFORE_ADD_PREPARE_LS_MIGRATION_DAG_NET);
}
#endif
if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KP(scheduler));
@ -896,6 +1000,16 @@ int ObLSMigrationHandler::schedule_complete_ls_dag_net_(
param.rebuild_seq_ = ls_->get_rebuild_seq();
param.svr_rpc_proxy_ = svr_rpc_proxy_;
param.storage_rpc_ = storage_rpc_;
#ifdef ERRSIM
const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id;
const ObLSID errsim_ls_id(errsim_migration_ls_id);
if (ls_->get_ls_id() == errsim_ls_id) {
SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_complete_ls_dag_net",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id());
DEBUG_SYNC(BEFORE_ADD_COMPLETE_LS_MIGRATION_DAG_NET);
}
#endif
if (OB_FAIL(get_ha_src_info_(param.chosen_src_))) {
LOG_WARN("failed to get src info", K(ret), KPC(ls_), K(task));
} else if (OB_FAIL(get_result_(result))) {
@ -1069,10 +1183,11 @@ int ObLSMigrationHandler::check_can_skip_prepare_status_(bool &can_skip)
can_skip = false;
} else {
can_skip = true;
LOG_INFO("skip ls migration prepare status", K(status));
LOG_INFO("rebuild and migration status is not none, skip ls migration prepare status", K(status), K(task));
}
} else {
can_skip = true;
LOG_INFO("not rebuild ls, skip ls migration prepare status", K(task));
}
return ret;
}
@ -1212,6 +1327,14 @@ void ObLSMigrationHandler::stop()
LOG_ERROR("failed to cancel dag net", K(ret), K(task), KPC(ls_));
}
}
#ifdef ERRSIM
SERVER_EVENT_ADD("storage_ha", "migration_handler_stop",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id(),
"is_failed", ret,
"result", result_);
#endif
}
void ObLSMigrationHandler::wait(bool &wait_finished)
@ -1330,6 +1453,14 @@ int ObLSMigrationHandler::switch_next_stage_with_nolock_(const int32_t result)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("can not change ls migration handler status", K(ret), K(status_), K(next_status));
} else {
#ifdef ERRSIM
SERVER_EVENT_ADD("storage_ha", "migration_handler_change_status",
"tenant_id", ls_->get_tenant_id(),
"ls_id", ls_->get_ls_id().id(),
"current_status", status_,
"next_status", next_status,
"result", new_result);
#endif
FLOG_INFO("report result", K(result), K(new_result), K(result_), K(status_), K(next_status));
result_ = new_result;
status_ = next_status;

View File

@ -101,6 +101,11 @@ private:
int check_task_exist_(
const ObLSMigrationHandlerStatus &status,
bool &is_exist);
int handle_failed_task_(
const ObLSMigrationHandlerStatus &status,
bool &need_generate_dag_net);
// only use this function when task exist
int cancel_current_task_();
int do_init_status_();
int do_prepare_ls_status_();

View File

@ -872,6 +872,12 @@ int ObStartPrepareMigrationTask::wait_transfer_tablets_ready_()
} else if (OB_FAIL(ls->build_tablet_iter(tablet_iterator))) {
LOG_WARN("failed to build ls tablet iter", K(ret), KPC(ctx_));
} else {
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("storage_ha", "before_wait_transfer_out_tablet_ready",
"tenant_id", ctx_->tenant_id_,
"ls_id", ctx_->arg_.ls_id_.id(),
"ret", ret);
#endif
DEBUG_SYNC(BEFORE_WAIT_TRANSFER_OUT_TABLET_READY);
ObIDagNet *dag_net = nullptr;
while (OB_SUCC(ret)) {
@ -887,6 +893,12 @@ int ObStartPrepareMigrationTask::wait_transfer_tablets_ready_()
} else if (dag_net->is_cancel()) {
ret = OB_CANCELED;
LOG_WARN("task is cancelled", K(ret), K(*this));
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("storage_ha", "start_prepare_migration_task_cancel",
"tenant_id", ctx_->tenant_id_,
"ls_id", ctx_->arg_.ls_id_.id(),
"ret", ret);
#endif
} else if (OB_FAIL(tablet_iterator.get_next_tablet(tablet_handle))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;