diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index d3279c3dc..44fc5ed8f 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -682,6 +682,12 @@ class ObString; ACT(BEFORE_MIGRATION_CREATE_TABLE_STORE,)\ ACT(BEFORE_FILL_AUTO_SPLIT_PARAMS,)\ ACT(BEFORE_UPDATE_TABLET_HA_STATUS,)\ + ACT(BEFORE_MIGRATION_DO_INIT_STATUS,)\ + ACT(BEFORE_MIGRATION_DO_PREPARE_LS_STATUS,)\ + ACT(BEFORE_MIGRATION_DO_BUILD_LS_STATUS,)\ + ACT(BEFORE_ADD_PREPARE_LS_MIGRATION_DAG_NET,)\ + ACT(BEFORE_ADD_BUILD_LS_MIGRATION_DAG_NET,)\ + ACT(BEFORE_ADD_COMPLETE_LS_MIGRATION_DAG_NET,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 6a88df5a3..47bfacb63 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -65,6 +65,7 @@ namespace lib namespace share { +ERRSIM_POINT_DEF(EN_SKIP_LOOP_BLOCKING_DAG); #define DEFINE_TASK_ADD_KV(n) \ template \ @@ -3813,6 +3814,11 @@ int ObDagNetScheduler::loop_blocking_dag_net_list() if (OB_ISNULL(scheduler_)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "scheduler is null", KP(scheduler_)); +#ifdef ERRSIM + } else if (EN_SKIP_LOOP_BLOCKING_DAG != OB_SUCCESS) { + ret = EN_SKIP_LOOP_BLOCKING_DAG; + COMMON_LOG(WARN, "[ERRSIM] skip loop blocking dag net list", K(ret)); +#endif } else { ObMutexGuard guard(dag_net_map_lock_); ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header(); @@ -3823,16 +3829,13 @@ int ObDagNetScheduler::loop_blocking_dag_net_list() LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt)); tmp = cur; cur = cur->get_next(); - if (tmp->is_cancel()) { - (void) finish_dag_net_without_lock(*tmp); - (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp); - (void) scheduler_->free_dag_net(tmp); // set tmp nullptr - } else if (OB_TMP_FAIL(tmp->start_running())) { - // If start running failed, need call clear_dag_net_ctx() to release some resources. + if (tmp->is_cancel() || OB_TMP_FAIL(tmp->start_running())) { + // If dag net has been cancelled or failed to start running, move dag net to finished list. + // Function clear_dag_net_ctx() will be called to release some resources after being scheduled at finish list. // Move this dag net from blocking to finished list to avoid dead lock. (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp); (void) add_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, tmp); - COMMON_LOG(WARN, "failed to start running, move to finished list", K(tmp_ret), KPC(tmp)); + COMMON_LOG(WARN, "dag net has been cancelled or failed to start running, move to finished list", K(tmp_ret), KPC(tmp)); } else { tmp->set_start_time(); --rest_cnt; diff --git a/src/storage/high_availability/ob_ls_complete_migration.cpp b/src/storage/high_availability/ob_ls_complete_migration.cpp index e9eae27e4..5c8da2938 100644 --- a/src/storage/high_availability/ob_ls_complete_migration.cpp +++ b/src/storage/high_availability/ob_ls_complete_migration.cpp @@ -2238,6 +2238,12 @@ int ObWaitDataReadyTask::check_ls_and_task_status_( } else if (ctx_->is_failed()) { ret = OB_CANCELED; STORAGE_LOG(WARN, "ls migration task is failed", K(ret), KPC(ctx_)); +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("storage_ha", "wait_data_ready_task_cancel", + "tenant_id", ctx_->tenant_id_, + "ls_id", ctx_->arg_.ls_id_.id(), + "ret", ret); +#endif } else if (ls->is_stopped()) { ret = OB_NOT_RUNNING; LOG_WARN("ls is not running, stop migration dag net", K(ret), KPC(ctx_)); diff --git a/src/storage/high_availability/ob_ls_migration.cpp b/src/storage/high_availability/ob_ls_migration.cpp index 88fea9de5..39d99f63d 100644 --- a/src/storage/high_availability/ob_ls_migration.cpp +++ b/src/storage/high_availability/ob_ls_migration.cpp @@ -716,7 +716,9 @@ int ObInitialMigrationTask::process() int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; #ifdef ERRSIM - SERVER_EVENT_SYNC_ADD("storage_ha", "before_prepare_migration_task"); + SERVER_EVENT_SYNC_ADD("storage_ha", "before_prepare_migration_task", + "tenant_id", ctx_->tenant_id_, + "ls_id", ctx_->arg_.ls_id_.id()); DEBUG_SYNC(BEFORE_PREPARE_MIGRATION_TASK); #endif @@ -790,6 +792,12 @@ int ObInitialMigrationTask::generate_migration_dags_() LOG_WARN("failed to init migration finish dag", K(ret)); } else if (OB_FAIL(this->get_dag()->add_child(*start_migration_dag))) { LOG_WARN("failed to add start migration dag", K(ret), KPC(start_migration_dag)); +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("storage_ha", "initial_migration_task_add_child_failed", + "tenant_id", ctx_->tenant_id_, + "ls_id", ctx_->arg_.ls_id_.id(), + "ret", ret); +#endif } else if (OB_FAIL(start_migration_dag->create_first_task())) { LOG_WARN("failed to create first task", K(ret)); } else if (OB_FAIL(start_migration_dag->add_child(*migration_finish_dag))) { diff --git a/src/storage/high_availability/ob_ls_migration_handler.cpp b/src/storage/high_availability/ob_ls_migration_handler.cpp index 2ace13211..e99502e71 100644 --- a/src/storage/high_availability/ob_ls_migration_handler.cpp +++ b/src/storage/high_availability/ob_ls_migration_handler.cpp @@ -251,6 +251,14 @@ int ObLSMigrationHandler::change_status_(const ObLSMigrationHandlerStatus &new_s ret = OB_ERR_UNEXPECTED; LOG_WARN("can not change ls migration handler status", K(ret), K(status_), K(new_status)); } else { +#ifdef ERRSIM + SERVER_EVENT_ADD("storage_ha", "migration_handler_change_status", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id(), + "current_status", status_, + "next_status", new_status, + "result", result_); +#endif status_ = new_status; } } @@ -382,6 +390,75 @@ int ObLSMigrationHandler::check_task_exist_( return ret; } +int ObLSMigrationHandler::handle_failed_task_(const ObLSMigrationHandlerStatus &status, bool &need_generate_dag_net) +{ + int ret = OB_SUCCESS; + need_generate_dag_net = false; + int32_t result = OB_SUCCESS; + bool is_migration_failed = false; + bool is_exist = false; + ObLSMigrationTask ls_migration_task; + ObTenantDagScheduler *scheduler = nullptr; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("ls migration handler do not init", K(ret)); + } else if (OB_FAIL(get_result_(result))) { + LOG_WARN("failed to get result", K(ret), K(status)); + } else if (FALSE_IT(is_migration_failed = (OB_SUCCESS != result))) { + } else if (OB_FAIL(check_task_exist_(status, is_exist))) { + LOG_WARN("failed to check task exist", K(ret), K(status)); + } else if (is_exist) { + if (!is_migration_failed) { + // if dag net is exist and migration is not failed, do nothing + need_generate_dag_net = false; + } else { + // if dag net is exist and migration is failed, cancel the migration task + if (OB_FAIL(cancel_current_task_())) { + LOG_WARN("failed to cancel current task", K(ret), KPC(ls_)); + } else { + need_generate_dag_net = false; + } + } + } else { + if (!is_migration_failed) { + // if dag net is not exist and migration is not failed, need to generate dag net + need_generate_dag_net = true; + } else { + // if dag net is not exist and migration is already failed, need to switch stage manually and no need to generate dag net + if (OB_FAIL(switch_next_stage(result))) { + LOG_WARN("failed to switch next stage", K(ret), K(result), KPC(ls_)); + } else { + need_generate_dag_net = false; + } + } + } + return ret; +} + +int ObLSMigrationHandler::cancel_current_task_() { + int ret = OB_SUCCESS; + ObLSMigrationTask ls_migration_task; + ObTenantDagScheduler *scheduler = nullptr; + + if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KPC(ls_)); + } else if (OB_FAIL(get_ls_migration_task_(ls_migration_task))) { + LOG_WARN("failed to get ls migration task", K(ret), KPC(ls_)); + } else if (!ls_migration_task.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls migration task should not be invalid", K(ret), K(ls_migration_task)); + } else if (OB_FAIL(scheduler->cancel_dag_net(ls_migration_task.task_id_))) { + LOG_WARN("failed to cancel dag net", K(ret), K(this), K(ls_migration_task)); + } else { + common::SpinWLockGuard guard(lock_); + is_cancel_ = true; + } + + return ret; +} + int ObLSMigrationHandler::add_ls_migration_task( const share::ObTaskId &task_id, const ObMigrationOpArg &arg) { @@ -568,14 +645,21 @@ int ObLSMigrationHandler::do_init_status_() int tmp_ret = OB_SUCCESS; ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; bool is_empty = false; - bool can_switch_next_stage = true; ObLSMigrationHandlerStatus new_status = ObLSMigrationHandlerStatus::MAX_STATUS; + DEBUG_SYNC(BEFORE_MIGRATION_DO_INIT_STATUS); + if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("ls migration handler do not init", K(ret)); } else if (is_migration_failed_()) { - //do nothing + int32_t result = OB_SUCCESS; + if (OB_FAIL(get_result_(result))) { + LOG_WARN("failed to get result", K(ret)); + } else { + ret = result; + LOG_INFO("migration is already failed, skip init status", K(result), KPC(ls_)); + } } else if (is_cancel()) { ret = OB_CANCELED; LOG_WARN("task is canceled", K(ret)); @@ -622,9 +706,7 @@ int ObLSMigrationHandler::do_init_status_() } if (OB_FAIL(ret)) { - if (!can_switch_next_stage) { - //do nothing - } else if (OB_SUCCESS != (tmp_ret = switch_next_stage(ret))) { + if (OB_TMP_FAIL(switch_next_stage(ret))) { LOG_WARN("failed to report result", K(ret), K(tmp_ret)); } } @@ -635,24 +717,24 @@ int ObLSMigrationHandler::do_prepare_ls_status_() { int ret = OB_SUCCESS; const ObLSMigrationHandlerStatus status = ObLSMigrationHandlerStatus::PREPARE_LS; - bool is_exist = false; + bool need_generate_dag_net = false; bool can_skip_prepare = false; + int32_t result = OB_SUCCESS; + + DEBUG_SYNC(BEFORE_MIGRATION_DO_PREPARE_LS_STATUS); if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("ls migration handler do not init", K(ret)); - } else if (is_migration_failed_()) { - //do nothing } else if (OB_FAIL(check_can_skip_prepare_status_(can_skip_prepare))) { LOG_WARN("failed to check can skip prepare status", K(ret)); } else if (can_skip_prepare) { if (OB_FAIL(switch_next_stage(OB_SUCCESS))) { LOG_WARN("failed to report result", K(ret), KPC(ls_), K(status)); } - } else if (OB_FAIL(check_task_exist_(status, is_exist))) { + } else if (OB_FAIL(handle_failed_task_(status, need_generate_dag_net))) { LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_)); - } else if (is_exist) { - //do nothing + } else if (!need_generate_dag_net) { } else if (OB_FAIL(generate_prepare_ls_dag_net_())) { LOG_WARN("failed to generate prepare ls dag net", K(ret), K(status), KPC(ls_)); } @@ -663,16 +745,16 @@ int ObLSMigrationHandler::do_build_ls_status_() { int ret = OB_SUCCESS; const ObLSMigrationHandlerStatus status = ObLSMigrationHandlerStatus::BUILD_LS; - bool is_exist = false; + bool need_generate_dag_net = false; + + DEBUG_SYNC(BEFORE_MIGRATION_DO_BUILD_LS_STATUS); if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("ls migration handler do not init", K(ret)); - } else if (is_migration_failed_()) { - //do nothing - } else if (OB_FAIL(check_task_exist_(status, is_exist))) { + } else if (OB_FAIL(handle_failed_task_(status, need_generate_dag_net))) { LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_)); - } else if (is_exist) { + } else if (!need_generate_dag_net) { //do nothing } else if (OB_FAIL(generate_build_ls_dag_net_())) { LOG_WARN("failed to generate build ls dag net", K(ret), K(status), KPC(ls_)); @@ -692,7 +774,11 @@ int ObLSMigrationHandler::do_complete_ls_status_() } else if (OB_FAIL(check_task_exist_(status, is_exist))) { LOG_WARN("failed to check task exist", K(ret), K(status), KPC(ls_)); } else if (is_exist) { - //do nothing + if (is_migration_failed_()) { + if (OB_FAIL(cancel_current_task_())) { + LOG_WARN("failed to cancel current task", K(ret), K(status), KPC(ls_)); + } + } } else if (OB_FAIL(generate_complete_ls_dag_net_())) { LOG_WARN("failed to generate complete ls dag net", K(ret), K(status), KPC(ls_)); } @@ -788,7 +874,16 @@ int ObLSMigrationHandler::schedule_build_ls_dag_net_( param.storage_rpc_ = storage_rpc_; param.svr_rpc_proxy_ = svr_rpc_proxy_; param.sql_proxy_ = sql_proxy_; - +#ifdef ERRSIM + const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id; + const ObLSID errsim_ls_id(errsim_migration_ls_id); + if (ls_->get_ls_id() == errsim_ls_id) { + SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_build_ls_dag_net", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id()); + DEBUG_SYNC(BEFORE_ADD_BUILD_LS_MIGRATION_DAG_NET); + } +#endif if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KP(scheduler)); @@ -843,7 +938,16 @@ int ObLSMigrationHandler::schedule_prepare_ls_dag_net_( ObLSPrepareMigrationParam param; param.arg_ = task.arg_; param.task_id_ = task.task_id_; - +#ifdef ERRSIM + const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id; + const ObLSID errsim_ls_id(errsim_migration_ls_id); + if (ls_->get_ls_id() == errsim_ls_id) { + SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_prepare_ls_dag_net", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id()); + DEBUG_SYNC(BEFORE_ADD_PREPARE_LS_MIGRATION_DAG_NET); + } +#endif if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret), KP(scheduler)); @@ -896,6 +1000,16 @@ int ObLSMigrationHandler::schedule_complete_ls_dag_net_( param.rebuild_seq_ = ls_->get_rebuild_seq(); param.svr_rpc_proxy_ = svr_rpc_proxy_; param.storage_rpc_ = storage_rpc_; +#ifdef ERRSIM + const int64_t errsim_migration_ls_id = GCONF.errsim_migration_ls_id; + const ObLSID errsim_ls_id(errsim_migration_ls_id); + if (ls_->get_ls_id() == errsim_ls_id) { + SERVER_EVENT_SYNC_ADD("storage_ha", "before_add_complete_ls_dag_net", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id()); + DEBUG_SYNC(BEFORE_ADD_COMPLETE_LS_MIGRATION_DAG_NET); + } +#endif if (OB_FAIL(get_ha_src_info_(param.chosen_src_))) { LOG_WARN("failed to get src info", K(ret), KPC(ls_), K(task)); } else if (OB_FAIL(get_result_(result))) { @@ -1069,10 +1183,11 @@ int ObLSMigrationHandler::check_can_skip_prepare_status_(bool &can_skip) can_skip = false; } else { can_skip = true; - LOG_INFO("skip ls migration prepare status", K(status)); + LOG_INFO("rebuild and migration status is not none, skip ls migration prepare status", K(status), K(task)); } } else { can_skip = true; + LOG_INFO("not rebuild ls, skip ls migration prepare status", K(task)); } return ret; } @@ -1212,6 +1327,14 @@ void ObLSMigrationHandler::stop() LOG_ERROR("failed to cancel dag net", K(ret), K(task), KPC(ls_)); } } + +#ifdef ERRSIM + SERVER_EVENT_ADD("storage_ha", "migration_handler_stop", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id(), + "is_failed", ret, + "result", result_); +#endif } void ObLSMigrationHandler::wait(bool &wait_finished) @@ -1330,6 +1453,14 @@ int ObLSMigrationHandler::switch_next_stage_with_nolock_(const int32_t result) ret = OB_ERR_UNEXPECTED; LOG_WARN("can not change ls migration handler status", K(ret), K(status_), K(next_status)); } else { +#ifdef ERRSIM + SERVER_EVENT_ADD("storage_ha", "migration_handler_change_status", + "tenant_id", ls_->get_tenant_id(), + "ls_id", ls_->get_ls_id().id(), + "current_status", status_, + "next_status", next_status, + "result", new_result); +#endif FLOG_INFO("report result", K(result), K(new_result), K(result_), K(status_), K(next_status)); result_ = new_result; status_ = next_status; diff --git a/src/storage/high_availability/ob_ls_migration_handler.h b/src/storage/high_availability/ob_ls_migration_handler.h index fde74ba1b..05bdfc396 100644 --- a/src/storage/high_availability/ob_ls_migration_handler.h +++ b/src/storage/high_availability/ob_ls_migration_handler.h @@ -101,6 +101,11 @@ private: int check_task_exist_( const ObLSMigrationHandlerStatus &status, bool &is_exist); + int handle_failed_task_( + const ObLSMigrationHandlerStatus &status, + bool &need_generate_dag_net); + // only use this function when task exist + int cancel_current_task_(); int do_init_status_(); int do_prepare_ls_status_(); diff --git a/src/storage/high_availability/ob_ls_prepare_migration.cpp b/src/storage/high_availability/ob_ls_prepare_migration.cpp index bd05bc5ea..3cc0c8db6 100644 --- a/src/storage/high_availability/ob_ls_prepare_migration.cpp +++ b/src/storage/high_availability/ob_ls_prepare_migration.cpp @@ -872,6 +872,12 @@ int ObStartPrepareMigrationTask::wait_transfer_tablets_ready_() } else if (OB_FAIL(ls->build_tablet_iter(tablet_iterator))) { LOG_WARN("failed to build ls tablet iter", K(ret), KPC(ctx_)); } else { +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("storage_ha", "before_wait_transfer_out_tablet_ready", + "tenant_id", ctx_->tenant_id_, + "ls_id", ctx_->arg_.ls_id_.id(), + "ret", ret); +#endif DEBUG_SYNC(BEFORE_WAIT_TRANSFER_OUT_TABLET_READY); ObIDagNet *dag_net = nullptr; while (OB_SUCC(ret)) { @@ -887,6 +893,12 @@ int ObStartPrepareMigrationTask::wait_transfer_tablets_ready_() } else if (dag_net->is_cancel()) { ret = OB_CANCELED; LOG_WARN("task is cancelled", K(ret), K(*this)); +#ifdef ERRSIM + SERVER_EVENT_SYNC_ADD("storage_ha", "start_prepare_migration_task_cancel", + "tenant_id", ctx_->tenant_id_, + "ls_id", ctx_->arg_.ls_id_.id(), + "ret", ret); +#endif } else if (OB_FAIL(tablet_iterator.get_next_tablet(tablet_handle))) { if (OB_ITER_END == ret) { ret = OB_SUCCESS;