diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index d39b9cad3..26dbe8311 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -131,6 +131,7 @@ WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, 20007, "latch: log external st WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, 20008, "latch: log external storage handler rw wait", "", "", "", CONCURRENCY, true, false) WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_WAIT, 20009, "latch: log external storage handler spin wait", "", "", "", CONCURRENCY, true, false) WAIT_EVENT_DEF(DH_LOCAL_SYNC_COND_WAIT, 20010, "datahub local sync conditional wait", "address", "", "", CONCURRENCY, true, true) +WAIT_EVENT_DEF(TRANSFER_HANDLER_COND_WAIT, 20011, "transfer handler condition wait", "address", "", "", CONCURRENCY, true, true) // share storage 21001-21999 WAIT_EVENT_DEF(ZONE_STORAGE_MANAGER_LOCK_WAIT, 21001, "latch: zone storage manager maintaince lock wait", "address", "number", "tries", CONCURRENCY, true, false) diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 2f72aa69f..f61000c75 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -661,6 +661,7 @@ class ObString; ACT(BEFORE_PREFETCH_BACKUP_INFO_TASK,)\ ACT(RS_CHANGE_TURN_DEBUG_SYNC,)\ ACT(AFTER_MIGRATION_CREATE_ALL_TABLET,)\ + ACT(BEFORE_PARALLEL_BUILD_TABLET_INFO_TABLET,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/scheduler/ob_dag_scheduler_config.h b/src/share/scheduler/ob_dag_scheduler_config.h index 9dd9b5a64..a5019797e 100644 --- a/src/share/scheduler/ob_dag_scheduler_config.h +++ b/src/share/scheduler/ob_dag_scheduler_config.h @@ -196,6 +196,8 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_BACKFILL_TX, ObDagPrio::DAG_PRIO_HA true, 3, {"tenant_id", "src_ls_id", "start_scn"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_REPLACE_TABLE, ObDagPrio::DAG_PRIO_HA_HIGH, ObSysTaskType::TRANSFER_TASK, "TRANSFER_REPLACE_TABLE", "TRANSFER", true, 2, {"tenant_id", "desc_ls_id"}) +DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_BUILD_TABLET_INFO, ObDagPrio::DAG_PRIO_HA_HIGH, ObSysTaskType::TRANSFER_TASK, "TRANSFER_BUILD_TABLET_INFO", "TRANSFER", + false, 0, {}) // DAG_TYPE_TRANSFER END DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TTL, ObDagPrio::DAG_PRIO_TTL, ObSysTaskType::TABLE_API_TTL_TASK, "TTL_DELTE_DAG", "TTL", false, 4, {"tenant_id", "ls_id", "table_id", "tablet_id"}) diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index ab21f40db..cdba4a45c 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -234,6 +234,7 @@ public: TASK_TYPE_CHECK_CONVERT_TABLET = 79, TASK_TYPE_VECTOR_INDEX_MEMDATA_SYNC = 80, TASK_TYPE_DELETE_LOB_META_ROW = 81, + TASK_TYPE_TRANSFER_BUILD_TABLET_INFO = 82, TASK_TYPE_MAX, }; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 26fca9cc7..5d9b36526 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -270,6 +270,7 @@ ob_set_subtarget(ob_storage high_availability high_availability/ob_storage_ha_diagnose_mgr.cpp high_availability/ob_storage_ha_diagnose_service.cpp high_availability/ob_cs_replica_migration.cpp + high_availability/ob_transfer_parallel_build_tablet_info.cpp ) ob_set_subtarget(ob_storage restore diff --git a/src/storage/high_availability/ob_storage_ha_struct.cpp b/src/storage/high_availability/ob_storage_ha_struct.cpp index ce9cdf7f7..bb9e67792 100644 --- a/src/storage/high_availability/ob_storage_ha_struct.cpp +++ b/src/storage/high_availability/ob_storage_ha_struct.cpp @@ -2644,4 +2644,4 @@ void ObMacroBlockReuseMgr::free_reuse_value_(ReuseMajorTableValue *&reuse_value) } } -} \ No newline at end of file +} diff --git a/src/storage/high_availability/ob_storage_ha_struct.h b/src/storage/high_availability/ob_storage_ha_struct.h index db5914a9e..e64b0f0a5 100644 --- a/src/storage/high_availability/ob_storage_ha_struct.h +++ b/src/storage/high_availability/ob_storage_ha_struct.h @@ -701,6 +701,7 @@ public: private: DISALLOW_COPY_AND_ASSIGN(ObMigrationChooseSrcHelperInitParam); }; + } } #endif diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index e7a506acb..a1a6f6c37 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -32,6 +32,7 @@ #include "storage/high_availability/ob_storage_ha_diagnose_mgr.h" #include "storage/tx/wrs/ob_weak_read_util.h" #include "rootserver/mview/ob_collect_mv_merge_info_task.h" +#include "storage/high_availability/ob_transfer_parallel_build_tablet_info.h" using namespace oceanbase::transaction; using namespace oceanbase::share; @@ -78,7 +79,9 @@ ObTransferHandler::ObTransferHandler() task_info_(), diagnose_result_msg_(share::ObStorageHACostItemName::MAX_NAME), transfer_handler_lock_(), - transfer_handler_enabled_(true) + transfer_handler_enabled_(true), + ctx_(), + cond_() { } @@ -105,7 +108,9 @@ int ObTransferHandler::init( } else if (OB_FAIL(transfer_worker_mgr_.init(ls))) { LOG_WARN("failed to init transfer worker manager", K(ret), KP(ls)); } else if (OB_FAIL(related_info_.init())) { - LOG_WARN("failed to init related_info"); + LOG_WARN("failed to init related_info", K(ret)); + } else if (OB_FAIL(cond_.init(ObWaitEventIds::TRANSFER_HANDLER_COND_WAIT))) { + LOG_WARN("failed to init thread cond", K(ret)); } else { ls_ = ls; bandwidth_throttle_ = bandwidth_throttle; @@ -469,6 +474,9 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta } else if (!task_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("do with start status get invalid argument", K(ret), K(task_info)); + } else if (OB_FAIL(ctx_.build_transfer_tablet_info(task_info.dest_ls_id_, task_info.tablet_list_, + task_info.trace_id_, task_info.data_version_))) { + LOG_WARN("failed to build transfer tablet info", K(ret), K(task_info)); } else if (OB_FAIL(ObTransferUtils::get_gts(task_info.tenant_id_, gts_seq_))) { LOG_WARN("failed to get gts seq", K(ret), K(task_info)); } else if (OB_FAIL(start_trans_(timeout_ctx, trans))) { @@ -642,6 +650,9 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta process_perf_diagnose_info_(ObStorageHACostItemName::TRANSFER_START_END, ObStorageHADiagTaskType::TRANSFER_START, start_ts, tmp_round, true/*is_report*/); } + + finish_parallel_tablet_info_dag_(task_info); + LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts); return ret; } @@ -1077,7 +1088,6 @@ int ObTransferHandler::do_trans_transfer_start_( LOG_INFO("[TRANSFER] start do trans transfer start", K(task_info)); int ret = OB_SUCCESS; SCN start_scn; - ObArray tablet_meta_list; const share::ObTransferStatus next_status(ObTransferStatus::DOING); const int64_t start_ts = ObTimeUtil::current_time(); @@ -1100,10 +1110,10 @@ int ObTransferHandler::do_trans_transfer_start_( LOG_WARN("failed to unblock tx", K(ret), K(task_info)); } else if (OB_FAIL(wait_src_ls_replay_to_start_scn_(task_info, start_scn, timeout_ctx))) { LOG_WARN("failed to wait src ls replay to start scn", K(ret), K(task_info)); - } else if (OB_FAIL(get_transfer_tablets_meta_(task_info, tablet_meta_list))) { - LOG_WARN("failed to get transfer tablets meta", K(ret), K(task_info)); - } else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, tablet_meta_list, timeout_ctx, trans))) { - LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn), K(tablet_meta_list)); + } else if (OB_FAIL(parallel_get_transfer_tablets_meta_(task_info, timeout_ctx))) { + LOG_WARN("failed to do parallel get transfer tablets meta", K(ret), K(task_info)); + } else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, timeout_ctx, trans))) { + LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn)); } else if (OB_FAIL(update_transfer_status_(task_info, next_status, start_scn, OB_SUCCESS, trans))) { LOG_WARN("failed to update transfer status", K(ret), K(task_info)); } @@ -1227,7 +1237,6 @@ int ObTransferHandler::do_trans_transfer_start_v2_( LOG_INFO("[TRANSFER] start do trans transfer start v2", K(task_info)); int ret = OB_SUCCESS; SCN start_scn; - ObArray tablet_meta_list; const share::ObTransferStatus next_status(ObTransferStatus::DOING); ObAddr src_ls_leader; ObStorageHASrcInfo src_info; @@ -1298,8 +1307,8 @@ int ObTransferHandler::do_trans_transfer_start_v2_( } else if (OB_FAIL(wait_src_ls_replay_to_start_scn_(task_info, start_scn, timeout_ctx))) { LOG_WARN("failed to wait src ls replay to start scn", K(ret), K(task_info)); } else if (STEP_COST_AND_CHECK_TIMEOUT(wait_src_replay_cost)) { - } else if (OB_FAIL(get_transfer_tablets_meta_(task_info, tablet_meta_list))) { - LOG_WARN("failed to get transfer tablets meta", K(ret), K(task_info)); + } else if (OB_FAIL(parallel_get_transfer_tablets_meta_(task_info, timeout_ctx))) { + LOG_WARN("failed to do parallel get transfer tablets meta", K(ret), K(task_info)); } else if (STEP_COST_AND_CHECK_TIMEOUT(get_tablets_meta_cost)) { // move tx } else if (OB_FAIL(do_move_tx_to_dest_ls_(task_info, timeout_ctx, trans, @@ -1307,8 +1316,8 @@ int ObTransferHandler::do_trans_transfer_start_v2_( LOG_WARN("failed to do move tx to dest_ls", K(ret), K(task_info)); } else if (STEP_COST_AND_CHECK_TIMEOUT(move_tx_cost)) { // transfer in - } else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, tablet_meta_list, timeout_ctx, trans))) { - LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn), K(tablet_meta_list)); + } else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, timeout_ctx, trans))) { + LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn)); } else if (STEP_COST_AND_CHECK_TIMEOUT(transfer_in_cost)) { } else if (OB_FAIL(update_transfer_status_(task_info, next_status, start_scn, OB_SUCCESS, trans))) { LOG_WARN("failed to update transfer status", K(ret), K(task_info)); @@ -1810,58 +1819,6 @@ int ObTransferHandler::wait_ls_replay_event_( return ret; } -int ObTransferHandler::get_transfer_tablets_meta_( - const share::ObTransferTaskInfo &task_info, - common::ObIArray &tablet_meta_list) -{ - int ret = OB_SUCCESS; - tablet_meta_list.reset(); - obrpc::ObCopyTabletInfo tablet_info; - const int64_t start_ts = ObTimeUtil::current_time(); - DEBUG_SYNC(BEFORE_START_TRANSFER_GET_TABLET_META); - - if (!is_inited_) { - ret = OB_NOT_INIT; - LOG_WARN("transfer handler do not init", K(ret)); - } else if (!task_info.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get transfer tablets meta get invalid argument", K(ret), K(task_info)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < task_info.tablet_list_.count(); ++i) { - const ObTransferTabletInfo &transfer_tablet_info = task_info.tablet_list_.at(i); - ObTabletHandle tablet_handle; - tablet_info.reset(); - if (OB_FAIL(ls_->ha_get_tablet(transfer_tablet_info.tablet_id_, tablet_handle))) { - LOG_WARN("failed to get tablet", K(ret), K(transfer_tablet_info), K(tablet_handle)); - } else if (OB_FAIL(get_next_tablet_info_(task_info, transfer_tablet_info, tablet_handle, tablet_info))) { - LOG_WARN("failed to get next tablet info ", K(ret), K(transfer_tablet_info), K(tablet_handle)); - } else if (OB_FAIL(tablet_meta_list.push_back(tablet_info.param_))) { - LOG_WARN("failed to push tablet info into array", K(ret), K(tablet_info)); - } - } - - LOG_INFO("[TRANSFER_BLOCK_TX] get transfer tablets meta", K(ret), "cost", ObTimeUtil::current_time() - start_ts); - -#ifdef ERRSIM - if (OB_SUCC(ret)) { - ret = EN_GET_TRANSFER_TABLET_META_FAILED ? : OB_SUCCESS; - if (OB_FAIL(ret)) { - STORAGE_LOG(ERROR, "fake EN_GET_TRANSFER_TABLET_META_FAILED", K(ret)); - } - } -#endif - - } - DEBUG_SYNC(AFTER_START_TRANSFER_GET_TABLET_META); - if (OB_SUCC(ret)) { - process_perf_diagnose_info_(ObStorageHACostItemName::SRC_LS_GET_TABLET_META, - ObStorageHADiagTaskType::TRANSFER_START, 0/*start_ts*/, round_, false/*is_report*/); - } else { - diagnose_result_msg_ = share::ObStorageHACostItemName::SRC_LS_GET_TABLET_META; - } - return ret; -} - int ObTransferHandler::get_next_tablet_info_( const share::ObTransferTaskInfo &task_info, const ObTransferTabletInfo &transfer_tablet_info, @@ -1908,25 +1865,28 @@ int ObTransferHandler::get_next_tablet_info_( int ObTransferHandler::do_tx_start_transfer_in_( const share::ObTransferTaskInfo &task_info, const SCN &start_scn, - const common::ObIArray &tablet_meta_list, ObTimeoutCtx &timeout_ctx, common::ObMySQLTransaction &trans) { int ret = OB_SUCCESS; const int64_t MAX_BUF_LEN = 1.5 * 1024 * 1024; // 1.5M const int64_t start_ts = ObTimeUtil::current_time(); + int64_t tablet_count = 0; if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("transfer handler do not init", K(ret)); - } else if (!task_info.is_valid() || tablet_meta_list.empty() || !start_scn.is_valid()) { + } else if (!task_info.is_valid() || !start_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("do tx start transfer in get invalid argument", K(ret), K(task_info), - K(tablet_meta_list), K(start_scn)); + LOG_WARN("do tx start transfer in get invalid argument", K(ret), K(task_info), K(start_scn)); + } else if (FALSE_IT(tablet_count = ctx_.get_tablet_info_num())) { + } else if (tablet_count != task_info.tablet_list_.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("transfer tablet meta info not match, unexpected", K(ret), K(tablet_count), K(task_info)); } else { int64_t index = 0; ObTXStartTransferInInfo start_transfer_in_info; - while (OB_SUCC(ret) && index < tablet_meta_list.count()) { + while (OB_SUCC(ret) && index < tablet_count) { start_transfer_in_info.reset(); start_transfer_in_info.src_ls_id_ = task_info.src_ls_id_; start_transfer_in_info.dest_ls_id_ = task_info.dest_ls_id_; @@ -1939,17 +1899,19 @@ int ObTransferHandler::do_tx_start_transfer_in_( LOG_WARN("already timeout", K(ret), K(task_info)); } - for (int64_t i = index; i < tablet_meta_list.count() && OB_SUCC(ret); ++i, ++index) { - const ObMigrationTabletParam &tablet_meta = tablet_meta_list.at(i); - if (start_transfer_in_info.get_serialize_size() + tablet_meta.get_serialize_size() > MAX_BUF_LEN) { + for (int64_t i = index; i < tablet_count && OB_SUCC(ret); ++i, ++index) { + const ObMigrationTabletParam *tablet_meta = nullptr; + if (OB_FAIL(ctx_.get_tablet_info(index, tablet_meta))) { + LOG_WARN("failed to get tablet info", K(ret), K(index)); + } else if (start_transfer_in_info.get_serialize_size() + tablet_meta->get_serialize_size() > MAX_BUF_LEN) { if (OB_FAIL(inner_tx_start_transfer_in_(task_info, start_transfer_in_info, trans))) { LOG_WARN("failed to do inner tx start transfer in", K(ret), K(task_info), K(start_transfer_in_info)); } else { start_transfer_in_info.reset(); break; } - } else if (OB_FAIL(start_transfer_in_info.tablet_meta_list_.push_back(tablet_meta))) { - LOG_WARN("failed to push tablet meta into list", K(ret), K(tablet_meta)); + } else if (OB_FAIL(start_transfer_in_info.tablet_meta_list_.push_back(*tablet_meta))) { + LOG_WARN("failed to push tablet meta into list", K(ret), KPC(tablet_meta)); } } @@ -3301,5 +3263,280 @@ int ObTransferHandler::do_clean_diagnose_info_() return ret; } +int ObTransferHandler::parallel_get_transfer_tablets_meta_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const int64_t start_ts = ObTimeUtil::current_time(); + DEBUG_SYNC(BEFORE_START_TRANSFER_GET_TABLET_META); + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (OB_FAIL(generate_parallel_tablet_info_dag_(task_info))) { + LOG_WARN("failed to generate parallel tablet info dag", K(ret), K(task_info)); + } else if (OB_FAIL(do_build_tablet_info_(task_info, timeout_ctx))) { + LOG_WARN("failed to do build tablet info", K(ret), K(task_info)); + } + + if (OB_SUCCESS != (tmp_ret = wait_parallel_tablet_info_ready_(task_info, timeout_ctx, ret))) { + LOG_WARN("failed to wait parallel tablet info dag finish", K(tmp_ret), K(task_info)); + } + +#ifdef ERRSIM + if (OB_SUCC(ret)) { + ret = EN_GET_TRANSFER_TABLET_META_FAILED ? : OB_SUCCESS; + if (OB_FAIL(ret)) { + STORAGE_LOG(ERROR, "fake EN_GET_TRANSFER_TABLET_META_FAILED", K(ret)); + } + } +#endif + + DEBUG_SYNC(AFTER_START_TRANSFER_GET_TABLET_META); + if (OB_SUCC(ret)) { + process_perf_diagnose_info_(ObStorageHACostItemName::SRC_LS_GET_TABLET_META, + ObStorageHADiagTaskType::TRANSFER_START, 0/*start_ts*/, round_, false/*is_report*/); + } else { + diagnose_result_msg_ = share::ObStorageHACostItemName::SRC_LS_GET_TABLET_META; + } + + LOG_INFO("finish parallel get transfer tablets meta", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts); + return ret; +} + +int ObTransferHandler::generate_parallel_tablet_info_dag_( + const share::ObTransferTaskInfo &task_info) +{ + int ret = OB_SUCCESS; + ObTenantDagScheduler *scheduler = nullptr; + const ObDagPrio::ObDagPrioEnum prio = ObDagPrio::DAG_PRIO_HA_HIGH; + ObTransferParallelBuildTabletDag *build_tablet_dag = nullptr; + const bool emergency = true; + ObTransferParallelBuildTabletDag fake_dag; + bool exist = false; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (!task_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("generate parallel tablet info dag get invalid argument", K(ret), K(task_info)); + } else if (OB_FAIL(fake_dag.init(task_info.src_ls_id_, &ctx_))) { + LOG_WARN("failed to init build tablet dag", K(ret), K(task_info)); + } else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret)); + } else if (OB_FAIL(scheduler->check_dag_exist(&fake_dag, exist))) { + LOG_WARN("failed to check dag exist", K(ret), K(task_info)); + } else if (exist) { + ret = OB_EAGAIN; + LOG_WARN("parallel tabelt info dag is already exist, need wait it finish", K(ret), K(task_info)); + } else { + if (OB_FAIL(scheduler->alloc_dag_with_priority(prio, build_tablet_dag))) { + LOG_WARN("failed to alloc tablet group migration dag ", K(ret)); + } else if (OB_FAIL(build_tablet_dag->init(task_info.src_ls_id_, &ctx_))) { + LOG_WARN("failed to init transfer parallel build tablet dag", K(ret), K(task_info)); + } else if (OB_FAIL(build_tablet_dag->create_first_task())) { + LOG_WARN("failed to create first task", K(ret), K(task_info)); + } else if (OB_FAIL(scheduler->add_dag(build_tablet_dag, emergency))) { + LOG_WARN("failed to add sys tablets migration dag", K(ret), K(*build_tablet_dag)); + if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { + LOG_WARN("Fail to add task", K(ret)); + ret = OB_EAGAIN; + } + } else { + build_tablet_dag = nullptr; + } + + if (OB_NOT_NULL(build_tablet_dag) && OB_NOT_NULL(scheduler)) { + scheduler->free_dag(*build_tablet_dag); + build_tablet_dag = nullptr; + } + + if (OB_FAIL(ret)) { + //overwrite ret + //parallel thread is for speed up get_tablet_info + //if it is failed, the main thread will get whole tablet meta info + LOG_INFO("overwrite parallel tablet info result", K(ret)); + ret = OB_SUCCESS; + } + } + return ret; +} + +int ObTransferHandler::do_build_tablet_info_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx) +{ + int ret = OB_SUCCESS; + obrpc::ObCopyTabletInfo tablet_info; + ObTransferTabletInfo transfer_tablet_info; + ObTabletHandle tablet_handle; + int64_t tablet_count = 0; + const int64_t start_ts = ObTimeUtil::current_time(); + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (!task_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("do build tablet info get invalid argument", K(ret), K(task_info)); + } else { + while (OB_SUCC(ret)) { + transfer_tablet_info.reset(); + tablet_handle.reset(); + if (timeout_ctx.is_timeouted()) { + LOG_WARN("transfer trans already timeout, cannot get tablet info", K(ret), K(task_info)); + ret = OB_TIMEOUT; + } else if (OB_FAIL(ctx_.get_next_tablet_info(transfer_tablet_info))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("failed to get next tablet info", K(ret), K(task_info)); + } + } else if (OB_FAIL(ls_->ha_get_tablet(transfer_tablet_info.tablet_id_, tablet_handle))) { + LOG_WARN("failed to get tablet", K(ret), K(transfer_tablet_info), K(tablet_handle)); + } else if (OB_FAIL(get_next_tablet_info_(task_info, transfer_tablet_info, tablet_handle, tablet_info))) { + LOG_WARN("failed to get next tablet info ", K(ret), K(transfer_tablet_info), K(tablet_handle)); + } else if (OB_FAIL(ctx_.add_tablet_info(tablet_info.param_))) { + LOG_WARN("failed to add tablet info", K(ret), K(task_info), K(tablet_info), K(transfer_tablet_info)); + } else { + ++tablet_count; + } + } + + if (OB_SUCC(ret)) { + LOG_INFO("finish do build tablet infos", K(ret), K(tablet_count), "cost_ts", ObTimeUtil::current_time() - start_ts); + } + } + return ret; +} + + +int ObTransferHandler::wait_parallel_tablet_info_ready_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx, + int32_t &result) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const int64_t WAIT_FINISH_INTERVAL = 1; //1ms + const int64_t start_ts = ObTimeUtil::current_time(); + int32_t tmp_result = OB_SUCCESS; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (!task_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("wait parallel tablet info dag finish get invalid argument", K(ret), K(task_info)); + } else if (OB_SUCCESS != result) { + //do nothing + } else { + while (OB_SUCC(ret)) { + if (timeout_ctx.is_timeouted()) { + ret = OB_TIMEOUT; + LOG_WARN("wait parallel tablet info dag finish timeout", K(ret)); + break; + } else if (ctx_.is_build_tablet_finish()) { + break; + } else if (FALSE_IT(tmp_result = ctx_.get_result())) { + } else if (OB_SUCCESS != tmp_result) { + result = tmp_result; + FLOG_INFO("set parallel build tablet info result", K(ret), K(result)); + break; + } + common::ObThreadCondGuard guard(cond_); + if (OB_SUCCESS != (tmp_ret = cond_.wait(WAIT_FINISH_INTERVAL))) { + if (OB_TIMEOUT != tmp_ret) { + LOG_WARN("failed to idle", K(tmp_ret)); + } + } + } + } + + LOG_INFO("wait parallel tablet info dag finish", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts); + return ret; +} + +int ObTransferHandler::wait_parallel_tablet_info_dag_finish_( + const share::ObTransferTaskInfo &task_info) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObTenantDagScheduler *scheduler = nullptr; + const bool force_cancel = true; + ObTransferParallelBuildTabletDag fake_dag; + int64_t child_task_num = 0; + const int64_t WAIT_FINISH_INTERVAL = 1 * 1000; //1s + const int64_t start_ts = ObTimeUtil::current_time(); + bool is_exist = true; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (!task_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("wait parallel tablet info dag finish get invalid argument", K(ret), K(task_info)); + } else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret)); + } else if (OB_FAIL(fake_dag.init(task_info.dest_ls_id_, &ctx_))) { + LOG_WARN("failed to create fake dag", K(ret), K(task_info)); + } else { + while (true) { + if (OB_FAIL(scheduler->cancel_dag(&fake_dag, force_cancel))) { + LOG_WARN("failed to cancel dag", K(ret), K(task_info)); + } + //overwrite ret + if (OB_FAIL(scheduler->check_dag_exist(&fake_dag, is_exist))) { + LOG_WARN("failed to check dag exist", K(ret), K(fake_dag)); + } + + if (FALSE_IT(child_task_num = ctx_.get_child_task_num())) { + } else if (0 == child_task_num && !is_exist) { + break; + } + + common::ObThreadCondGuard guard(cond_); + if (OB_SUCCESS != (tmp_ret = cond_.wait(WAIT_FINISH_INTERVAL))) { + if (OB_TIMEOUT != tmp_ret) { + LOG_WARN("failed to idle", K(tmp_ret)); + } + } + } + } + + LOG_INFO("wait parallel tablet info dag finish", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts); + return ret; +} + +void ObTransferHandler::wakeup_thread_cond() +{ + common::ObThreadCondGuard guard(cond_); + cond_.broadcast(); +} + +void ObTransferHandler::finish_parallel_tablet_info_dag_( + const share::ObTransferTaskInfo &task_info) +{ + int ret = OB_SUCCESS; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer handler do not init", K(ret)); + } else if (!task_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("finsih parallel tablet info dag get invalid argument", K(ret), K(task_info)); + } else if (OB_FAIL(wait_parallel_tablet_info_dag_finish_(task_info))) { + LOG_WARN("failed to wait parallel tablet info dag finish", K(ret), K(task_info)); + } + + ctx_.reuse(); +} + } } diff --git a/src/storage/high_availability/ob_transfer_handler.h b/src/storage/high_availability/ob_transfer_handler.h index 8602bd50a..027e27543 100644 --- a/src/storage/high_availability/ob_transfer_handler.h +++ b/src/storage/high_availability/ob_transfer_handler.h @@ -103,6 +103,7 @@ public: const uint64_t timestamp, const int64_t start_ts, const bool is_report); + void wakeup_thread_cond(); private: int get_transfer_task_(share::ObTransferTaskInfo &task_info); int get_transfer_task_from_inner_table_( @@ -242,13 +243,9 @@ private: const share::ObTransferTaskInfo &task_info, const share::SCN &start_scn, ObTimeoutCtx &timeout_ctx); - int get_transfer_tablets_meta_( - const share::ObTransferTaskInfo &task_info, - common::ObIArray ¶ms); int do_tx_start_transfer_in_( const share::ObTransferTaskInfo &task_info, const share::SCN &start_scn, - const common::ObIArray ¶ms, ObTimeoutCtx &timeout_ctx, common::ObMySQLTransaction &trans); int inner_tx_start_transfer_in_( @@ -337,6 +334,23 @@ private: ObMySQLTransaction &trans, CollectTxCtxInfo &collect_batch, int64_t &batch_len); + int parallel_get_transfer_tablets_meta_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx); + int generate_parallel_tablet_info_dag_( + const share::ObTransferTaskInfo &task_info); + int do_build_tablet_info_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx); + int wait_parallel_tablet_info_ready_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx, + int32_t &result); + int wait_parallel_tablet_info_dag_finish_( + const share::ObTransferTaskInfo &task_info); + void finish_parallel_tablet_info_dag_( + const share::ObTransferTaskInfo &task_info); + private: static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s static const int64_t KILL_TX_MAX_RETRY_TIMES = 3; @@ -358,6 +372,9 @@ private: share::ObStorageHACostItemName diagnose_result_msg_; common::SpinRWLock transfer_handler_lock_; bool transfer_handler_enabled_; + ObTransferBuildTabletInfoCtx ctx_; + common::ObThreadCond cond_; + DISALLOW_COPY_AND_ASSIGN(ObTransferHandler); }; diff --git a/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.cpp b/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.cpp new file mode 100644 index 000000000..f5d418b4a --- /dev/null +++ b/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.cpp @@ -0,0 +1,355 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX STORAGE +#include "ob_transfer_parallel_build_tablet_info.h" +#include "observer/ob_server.h" +#include "ob_physical_copy_task.h" +#include "share/rc/ob_tenant_base.h" +#include "share/scheduler/ob_dag_warning_history_mgr.h" +#include "storage/tablet/ob_tablet_common.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "logservice/ob_log_service.h" +#include "lib/hash/ob_hashset.h" +#include "lib/time/ob_time_utility.h" +#include "observer/ob_server_event_history_table_operator.h" +#include "ob_storage_ha_src_provider.h" +#include "storage/tablet/ob_tablet_iterator.h" +#include "ob_storage_ha_utils.h" +#include "storage/tablet/ob_tablet.h" +#include "share/ls/ob_ls_table_operator.h" +#include "ob_rebuild_service.h" +#include "share/ob_cluster_version.h" +#include "ob_storage_ha_utils.h" + +namespace oceanbase +{ +using namespace share; +namespace storage +{ +/******************ObTransferParallelBuildTabletDag*********************/ +ObTransferParallelBuildTabletDag::ObTransferParallelBuildTabletDag() + : ObIDag(ObDagType::DAG_TYPE_TRANSFER_BUILD_TABLET_INFO), + is_inited_(false), + ls_id_(), + ls_handle_(), + ctx_(nullptr) +{ +} + +ObTransferParallelBuildTabletDag::~ObTransferParallelBuildTabletDag() +{ +} + +bool ObTransferParallelBuildTabletDag::operator == (const ObIDag &other) const +{ + bool is_same = true; + if (this == &other) { + // same + } else if (get_type() != other.get_type()) { + is_same = false; + } else { + const ObTransferParallelBuildTabletDag &other_dag = static_cast(other); + if (ls_id_ != other_dag.ls_id_) { + is_same = false; + } + } + return is_same; +} + +int64_t ObTransferParallelBuildTabletDag::hash() const +{ + int64_t hash_value = 0; + hash_value = common::murmurhash( + &ls_id_, sizeof(ls_id_), hash_value); + return hash_value; +} + +int ObTransferParallelBuildTabletDag::fill_dag_key(char *buf, const int64_t buf_len) const +{ + int ret = OB_SUCCESS; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("tablet rebuild major dag do not init", K(ret)); + } else { + int64_t pos = 0; + ret = databuff_printf(buf, buf_len, pos, "ObTransferParallelBuildTabletDag: ls_id = %s", to_cstring(ls_id_));; + if (OB_FAIL(ret)) { + LOG_WARN("failed to fill comment", K(ret), KPC(this)); + } + } + return ret; +} + +int ObTransferParallelBuildTabletDag::init( + const share::ObLSID &ls_id, + ObTransferBuildTabletInfoCtx *ctx) +{ + int ret = OB_SUCCESS; + + if (is_inited_) { + ret = OB_INIT_TWICE; + LOG_WARN("transfer parallel build tablet init twice", K(ret)); + } else if (!ls_id.is_valid() || OB_ISNULL(ctx)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("transfer parallel build tablet dag get invalid argument", K(ret), K(ls_id), KP(ctx)); + } else if (OB_FAIL(set_dag_id(ctx->get_task_id()))) { + LOG_WARN("failed to set dag id", K(ret), K(ls_id), KPC(ctx)); + } else if (OB_FAIL(ObStorageHADagUtils::get_ls(ls_id, ls_handle_))) { + LOG_WARN("failed to get ls", K(ret), K(ls_id)); + } else { + ls_id_ = ls_id; + ctx_ = ctx; + is_inited_ = true; + } + return ret; +} + +int ObTransferParallelBuildTabletDag::create_first_task() +{ + int ret = OB_SUCCESS; + ObTransferParallelBuildTabletTask *task = NULL; + share::ObTransferTabletInfo tablet_info; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallel build tablet dag do not init", K(ret)); + } else if (OB_FAIL(alloc_task(task))) { + LOG_WARN("Fail to alloc task", K(ret)); + } else if (OB_FAIL(task->init(tablet_info, ctx_))) { + LOG_WARN("failed to init tablet rebuild major task", K(ret), KPC(this)); + } else if (OB_FAIL(add_task(*task))) { + LOG_WARN("Fail to add task", K(ret)); + } else { + LOG_DEBUG("success to create first task", K(ret), KPC(this)); + } + return ret; +} + +int ObTransferParallelBuildTabletDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallele build tablet dag do not init", K(ret)); + } else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), + static_cast(tenant_id), ls_id_.id(), + "dag_id", to_cstring(ObCurTraceId::get_trace_id())))) { + LOG_WARN("failed to fill info param", K(ret)); + } + return ret; +} + +int ObTransferParallelBuildTabletDag::get_ls(ObLS *&ls) +{ + int ret = OB_SUCCESS; + ls = nullptr; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallel build tablet dag is not init", K(ret)); + } else { + ls = ls_handle_.get_ls(); + } + return ret; +} + +int64_t ObTransferParallelBuildTabletDag::to_string(char* buf, const int64_t buf_len) const +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("ls remove member dag do not init", K(ret)); + } else if (FALSE_IT(pos = ObIDag::to_string(buf, buf_len))) { + } else if (pos >= buf_len) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("dag to string buffer length is over limit", K(ret), K(pos), K(buf_len)); + } + return pos; +} + +/******************ObTransferParallelBuildTabletTask*********************/ +ObTransferParallelBuildTabletTask::ObTransferParallelBuildTabletTask() + : ObITask(TASK_TYPE_TRANSFER_BUILD_TABLET_INFO), + is_inited_(false), + first_tablet_info_(), + ctx_(nullptr), + ls_(nullptr) +{ +} + +ObTransferParallelBuildTabletTask::~ObTransferParallelBuildTabletTask() +{ + if (is_inited_) { + ctx_->dec_child_task_num(); + ls_->get_transfer_handler()->wakeup_thread_cond(); + } +} + +int ObTransferParallelBuildTabletTask::init( + const share::ObTransferTabletInfo &first_tablet_info, + ObTransferBuildTabletInfoCtx *ctx) +{ + int ret = OB_SUCCESS; + ObTransferParallelBuildTabletDag *dag = nullptr; + + if (is_inited_) { + ret = OB_INIT_TWICE; + LOG_WARN("transfer parallel build tablet task init twice", K(ret)); + } else if (OB_ISNULL(ctx)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("transfer parallel build tablet task init get invalid argument", K(ret), KP(ctx)); + } else if (OB_ISNULL(dag = static_cast(this->get_dag()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("transfer parallel build tablet dag should not be NULL", K(ret), KP(dag)); + } else if (OB_FAIL(dag->get_ls(ls_))) { + LOG_WARN("failed to get ls", K(ret), KPC(dag), K(first_tablet_info)); + } else { + first_tablet_info_ = first_tablet_info; + ctx_ = ctx; + ctx_->inc_child_task_num(); + is_inited_ = true; + } + return ret; +} + +int ObTransferParallelBuildTabletTask::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("start do transfer parallel build tablet task", K(first_tablet_info_)); + DEBUG_SYNC(BEFORE_PARALLEL_BUILD_TABLET_INFO_TABLET); + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_)); + } else if (ctx_->is_failed()) { + //do nothing + } else if (OB_FAIL(do_build_tablet_infos_())) { + LOG_WARN("failed to do build tablet infos", K(ret), KPC(ctx_)); + } + + if (OB_FAIL(ret) && OB_NOT_NULL(ctx_)) { + ctx_->set_result(ret); + } + return ret; +} + +int ObTransferParallelBuildTabletTask::do_build_tablet_infos_() +{ + int ret = OB_SUCCESS; + share::ObTransferTabletInfo tablet_info; + int64_t build_tablet_info_num = 0; + const int64_t start_ts = ObTimeUtil::current_time(); + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_)); + } else { + if (first_tablet_info_.is_valid()) { + if (OB_FAIL(do_build_tablet_info_(first_tablet_info_))) { + LOG_WARN("failed to do build tablet info", K(ret), K(first_tablet_info_)); + } else { + ++build_tablet_info_num; + } + } + + while (OB_SUCC(ret)) { + if (OB_FAIL(ctx_->get_next_tablet_info(tablet_info))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("failed to get next tablet id", K(ret), KPC(ctx_)); + } + } else if (OB_FAIL(do_build_tablet_info_(tablet_info))) { + LOG_WARN("failed to do build tablet info", K(ret), K(tablet_info)); + } else { + ++build_tablet_info_num; + } + } + + LOG_INFO("finish do build tablet infos", K(ret), K(build_tablet_info_num), "cost_ts", ObTimeUtil::current_time() - start_ts); + } + return ret; +} + +int ObTransferParallelBuildTabletTask::do_build_tablet_info_(const share::ObTransferTabletInfo &tablet_info) +{ + int ret = OB_SUCCESS; + ObTabletHandle tablet_handle; + ObTabletCreateDeleteMdsUserData user_data; + ObTablet *tablet = nullptr; + bool committed_flag = false; + ObMigrationTabletParam param; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_)); + } else if (!tablet_info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("do build tablet info get invalid argument", K(ret), K(tablet_info)); + } else if (OB_FAIL(ls_->ha_get_tablet(tablet_info.tablet_id_, tablet_handle))) { + LOG_WARN("failed to get tablet", K(ret), K(tablet_info), K(tablet_handle)); + } else if (OB_ISNULL(tablet = tablet_handle.get_obj())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet should not be NULL", K(ret), KP(tablet), K(tablet_info)); + } else if (OB_FAIL(tablet->ObITabletMdsInterface::get_latest_tablet_status(user_data, committed_flag))) { + LOG_WARN("failed to get tx data", K(ret), KPC(tablet), K(tablet_info)); + } else if (ObTabletStatus::TRANSFER_OUT != user_data.tablet_status_) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("tablet status is not match", K(ret), KPC(tablet), K(tablet_info), K(user_data)); + } else if (committed_flag) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("transfer src tablet status is transfer out but is already committed, not match", + K(ret), KPC(tablet), K(tablet_info), K(user_data)); + } else if (tablet_info.transfer_seq_ != tablet->get_tablet_meta().transfer_info_.transfer_seq_) { + ret = OB_TABLET_TRANSFER_SEQ_NOT_MATCH; + LOG_WARN("tablet transfer seq is not match", K(ret), KPC(tablet), K(tablet_info)); + } else if (OB_FAIL(tablet->build_transfer_tablet_param(ctx_->get_data_version(), ctx_->get_dest_ls_id(), param))) { + LOG_WARN("failed to build transfer tablet param", K(ret), K(tablet_info)); + } else if (OB_FAIL(ctx_->add_tablet_info(param))) { + LOG_WARN("failed to add tablet info", K(ret), K(param)); + } + return ret; +} + +int ObTransferParallelBuildTabletTask::generate_next_task(share::ObITask *&next_task) +{ + int ret = OB_SUCCESS; + ObTransferParallelBuildTabletTask *tmp_next_task = nullptr; + bool is_iter_end = false; + int64_t index = 0; + share::ObTransferTabletInfo tablet_info; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_ERROR("parallel create tablet task do not init", K(ret)); + } else if (OB_FAIL(ctx_->get_next_tablet_info(tablet_info))) { + if (OB_ITER_END == ret) { + //do nothing + } else { + LOG_WARN("failed to get next copy tablet info index", K(ret), KPC(ctx_)); + } + } else if (OB_FAIL(dag_->alloc_task(tmp_next_task))) { + LOG_WARN("failed to alloc task", K(ret)); + } else if (OB_FAIL(tmp_next_task->init(tablet_info, ctx_))) { + LOG_WARN("failed to init next task", K(ret), K(tablet_info), K(index)); + } else { + next_task = tmp_next_task; + } + return ret; +} + +} +} diff --git a/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.h b/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.h new file mode 100644 index 000000000..4e8fa1dc0 --- /dev/null +++ b/src/storage/high_availability/ob_transfer_parallel_build_tablet_info.h @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEABASE_STORAGE_TRANSFER_PARALLEL_BUILD_TABLET_INFO_ +#define OCEABASE_STORAGE_TRANSFER_PARALLEL_BUILD_TABLET_INFO_ + +#include "share/ob_define.h" +#include "lib/thread/ob_work_queue.h" +#include "lib/thread/ob_dynamic_thread_pool.h" +#include "share/ob_common_rpc_proxy.h" // ObCommonRpcProxy +#include "share/ob_srv_rpc_proxy.h" // ObPartitionServiceRpcProxy +#include "share/scheduler/ob_sys_task_stat.h" +#include "observer/ob_rpc_processor_simple.h" +#include "share/scheduler/ob_tenant_dag_scheduler.h" +#include "share/scn.h" +#include "ob_storage_ha_struct.h" + + +namespace oceanbase +{ +namespace storage +{ + +class ObTransferParallelBuildTabletDag : public share::ObIDag +{ +public: + ObTransferParallelBuildTabletDag(); + virtual ~ObTransferParallelBuildTabletDag(); + virtual bool operator == (const share::ObIDag &other) const override; + virtual int64_t hash() const override; + virtual int fill_dag_key(char *buf, const int64_t buf_len) const override; + virtual int create_first_task() override; + virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override; + virtual lib::Worker::CompatMode get_compat_mode() const override + { return lib::Worker::CompatMode::MYSQL; } + virtual uint64_t get_consumer_group_id() const override + { return consumer_group_id_; } + virtual int64_t to_string(char* buf, const int64_t buf_len) const override; + + int init( + const share::ObLSID &ls_id, + ObTransferBuildTabletInfoCtx *ctx); + int get_ls(ObLS *&ls); + +protected: + bool is_inited_; + share::ObLSID ls_id_; + ObLSHandle ls_handle_; + ObTransferBuildTabletInfoCtx *ctx_; + DISALLOW_COPY_AND_ASSIGN(ObTransferParallelBuildTabletDag); +}; + +class ObTransferParallelBuildTabletTask : public share::ObITask +{ +public: + ObTransferParallelBuildTabletTask(); + virtual ~ObTransferParallelBuildTabletTask(); + int init( + const share::ObTransferTabletInfo &first_tablet_info, + ObTransferBuildTabletInfoCtx *ctx); + virtual int process() override; + virtual int generate_next_task(share::ObITask *&next_task) override; + VIRTUAL_TO_STRING_KV(K("ObTransferParallelBuildTabletTask"), KP(this), KPC(ctx_)); +private: + int do_build_tablet_infos_(); + int do_build_tablet_info_(const share::ObTransferTabletInfo &tablet_info); + +private: + bool is_inited_; + share::ObTransferTabletInfo first_tablet_info_; + ObTransferBuildTabletInfoCtx *ctx_; + ObLS *ls_; + DISALLOW_COPY_AND_ASSIGN(ObTransferParallelBuildTabletTask); +}; + +} +} +#endif diff --git a/src/storage/high_availability/ob_transfer_struct.cpp b/src/storage/high_availability/ob_transfer_struct.cpp index d723f8178..1edbcdb38 100644 --- a/src/storage/high_availability/ob_transfer_struct.cpp +++ b/src/storage/high_availability/ob_transfer_struct.cpp @@ -1040,3 +1040,224 @@ int ObTransferRelatedInfo::get_related_info_task_id(share::ObTransferTaskID &tas } return ret; } + +/******************ObTransferTabletInfoMgr*********************/ +ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::ObTransferTabletInfoMgr() + : lock_(), + tablet_info_array_() +{ +} + +ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::~ObTransferTabletInfoMgr() +{ + common::SpinWLockGuard guard(lock_); + tablet_info_array_.reset(); +} + +int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::add_tablet_info( + const ObMigrationTabletParam ¶m) +{ + int ret = OB_SUCCESS; + if (!param.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("add tablet info get invalid argument", K(ret), K(param)); + } else { + common::SpinWLockGuard guard(lock_); + if (OB_FAIL(tablet_info_array_.push_back(param))) { + LOG_WARN("failed to add tablet info", K(ret), K(param)); + } + } + return ret; +} + +int64_t ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::get_tablet_info_num() const +{ + common::SpinRLockGuard guard(lock_); + return tablet_info_array_.count(); +} + +int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::get_tablet_info( + const int64_t index, const ObMigrationTabletParam *¶m) +{ + int ret = OB_SUCCESS; + param = nullptr; + common::SpinRLockGuard guard(lock_); + if (index < 0 || index >= tablet_info_array_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get tablet info get invalid argument", K(ret), K(index)); + } else { + param = &tablet_info_array_.at(index); + } + return ret; +} + +void ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::reuse() +{ + common::SpinWLockGuard guard(lock_); + tablet_info_array_.reset(); +} + +/******************ObTransferBuildTabletInfoCtx*********************/ +ObTransferBuildTabletInfoCtx::ObTransferBuildTabletInfoCtx() + : lock_(), + dest_ls_id_(), + index_(0), + tablet_info_array_(), + child_task_num_(0), + total_tablet_count_(0), + result_(OB_SUCCESS), + data_version_(0), + task_id_(), + mgr_() +{ +} + +ObTransferBuildTabletInfoCtx::~ObTransferBuildTabletInfoCtx() +{ +} + +void ObTransferBuildTabletInfoCtx::reuse() +{ + common::SpinWLockGuard guard(lock_); + total_tablet_count_ = 0; + dest_ls_id_.reset(); + index_ = 0; + tablet_info_array_.reset(); + child_task_num_ = 0; + result_ = OB_SUCCESS; + data_version_ = 0; + task_id_.reset(); + mgr_.reuse(); +} + +int ObTransferBuildTabletInfoCtx::build_transfer_tablet_info( + const share::ObLSID &dest_ls_id, + const common::ObIArray &tablet_info_array, + const common::ObCurTraceId::TraceId &task_id, + const uint64_t data_version) +{ + int ret = OB_SUCCESS; + common::SpinWLockGuard guard(lock_); + if (0 != index_ || !tablet_info_array_.empty()) { + ret = OB_INIT_TWICE; + LOG_WARN("build transfer tablet info init twice", K(ret), K(index_), K(tablet_info_array_)); + } else if (!dest_ls_id.is_valid() || !task_id.is_valid() || 0 == data_version) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("build transfer tablet info get invalid argument", K(ret), K(task_id), K(data_version)); + } else if (OB_FAIL(tablet_info_array_.assign(tablet_info_array))) { + LOG_WARN("failed to assign tablet info array", K(ret), K(tablet_info_array)); + } else { + dest_ls_id_ = dest_ls_id; + total_tablet_count_ = tablet_info_array_.count(); + task_id_ = task_id; + data_version_ = data_version; + } + return ret; +} + +bool ObTransferBuildTabletInfoCtx::is_valid() const +{ + common::SpinRLockGuard guard(lock_); + return is_valid_(); +} + +bool ObTransferBuildTabletInfoCtx::is_valid_() const +{ + return index_ >= 0 && tablet_info_array_.count() >= 0 && index_ <= tablet_info_array_.count() && data_version_ > 0 && dest_ls_id_.is_valid(); +} + +int ObTransferBuildTabletInfoCtx::get_next_tablet_info(share::ObTransferTabletInfo &tablet_info) +{ + int ret = OB_SUCCESS; + tablet_info.reset(); + common::SpinWLockGuard guard(lock_); + if (!is_valid_()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("transfer build tablet info ctx is invalid, unexpected", K(ret), KPC(this)); + } else if (index_ == tablet_info_array_.count()) { + ret = OB_ITER_END; + } else { + tablet_info = tablet_info_array_.at(index_); + ++index_; + } + return ret; +} + +void ObTransferBuildTabletInfoCtx::inc_child_task_num() +{ + ATOMIC_INC(&child_task_num_); +} + +void ObTransferBuildTabletInfoCtx::dec_child_task_num() +{ + ATOMIC_DEC(&child_task_num_); +} + +int64_t ObTransferBuildTabletInfoCtx::get_child_task_num() +{ + int64_t child_task_num = ATOMIC_LOAD(&child_task_num_); + return child_task_num; +} + +int64_t ObTransferBuildTabletInfoCtx::get_total_tablet_count() +{ + common::SpinRLockGuard guard(lock_); + return total_tablet_count_; +} + +bool ObTransferBuildTabletInfoCtx::is_build_tablet_finish() const +{ + return total_tablet_count_ == mgr_.get_tablet_info_num(); +} + +bool ObTransferBuildTabletInfoCtx::is_failed() const +{ + common::SpinRLockGuard guard(lock_); + return OB_SUCCESS != result_; +} + +void ObTransferBuildTabletInfoCtx::set_result(const int32_t result) +{ + common::SpinWLockGuard guard(lock_); + if (OB_SUCCESS == result_) { + result_ = result; + } +} + +int32_t ObTransferBuildTabletInfoCtx::get_result() +{ + common::SpinRLockGuard guard(lock_); + return result_; +} + +int ObTransferBuildTabletInfoCtx::add_tablet_info( + const ObMigrationTabletParam ¶m) +{ + int ret = OB_SUCCESS; + if (!param.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("add tablet info get invalid argument", K(ret), K(param)); + } else if (OB_FAIL(mgr_.add_tablet_info(param))) { + LOG_WARN("failed to add tablet info", K(ret), K(param)); + } + return ret; +} + +int ObTransferBuildTabletInfoCtx::get_tablet_info( + const int64_t index, const ObMigrationTabletParam *¶m) +{ + int ret = OB_SUCCESS; + param = nullptr; + if (index < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get tablet info get invalid argument", K(ret), K(index)); + } else if (OB_FAIL(mgr_.get_tablet_info(index, param))) { + LOG_WARN("failed to get tablet info", K(ret), K(index)); + } + return ret; +} + +int64_t ObTransferBuildTabletInfoCtx::get_tablet_info_num() const +{ + return mgr_.get_tablet_info_num(); +} diff --git a/src/storage/high_availability/ob_transfer_struct.h b/src/storage/high_availability/ob_transfer_struct.h index d35df2881..16900d1ca 100644 --- a/src/storage/high_availability/ob_transfer_struct.h +++ b/src/storage/high_availability/ob_transfer_struct.h @@ -316,6 +316,71 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTransferRelatedInfo); }; +struct ObTransferBuildTabletInfoCtx final +{ +public: + ObTransferBuildTabletInfoCtx(); + ~ObTransferBuildTabletInfoCtx(); + int build_transfer_tablet_info( + const share::ObLSID &dest_ls_id, + const common::ObIArray &tablet_info_array, + const common::ObCurTraceId::TraceId &task_id, + const uint64_t data_version); + void reuse(); + int get_next_tablet_info(share::ObTransferTabletInfo &tablet_info); + bool is_valid() const; + void inc_child_task_num(); + void dec_child_task_num(); + int64_t get_child_task_num(); + int64_t get_total_tablet_count(); + bool is_build_tablet_finish() const; + common::ObCurTraceId::TraceId &get_task_id() { return task_id_; } + bool is_failed() const; + void set_result(const int32_t result); + share::ObLSID &get_dest_ls_id() { return dest_ls_id_; } + uint64_t get_data_version() { return data_version_; } + int32_t get_result(); + + int add_tablet_info(const ObMigrationTabletParam ¶m); + int get_tablet_info(const int64_t index, const ObMigrationTabletParam *¶m); + int64_t get_tablet_info_num() const; + + TO_STRING_KV(K_(index), K_(tablet_info_array), K_(child_task_num), K_(total_tablet_count), + K_(result), K_(data_version), K_(task_id)); +private: + bool is_valid_() const; + +private: + struct ObTransferTabletInfoMgr final + { + public: + ObTransferTabletInfoMgr(); + ~ObTransferTabletInfoMgr(); + int add_tablet_info(const ObMigrationTabletParam ¶m); + int64_t get_tablet_info_num() const; + int get_tablet_info(const int64_t index, const ObMigrationTabletParam *¶m); + void reuse(); + + TO_STRING_KV(K_(tablet_info_array)); + private: + common::SpinRWLock lock_; + common::ObArray tablet_info_array_; + DISALLOW_COPY_AND_ASSIGN(ObTransferTabletInfoMgr); + }; +private: + common::SpinRWLock lock_; + share::ObLSID dest_ls_id_; + int64_t index_; + common::ObArray tablet_info_array_; + int64_t child_task_num_; + int64_t total_tablet_count_; + int32_t result_; + uint64_t data_version_; + common::ObCurTraceId::TraceId task_id_; + ObTransferTabletInfoMgr mgr_; + DISALLOW_COPY_AND_ASSIGN(ObTransferBuildTabletInfoCtx); +}; + } } #endif