transfer parallel get tablet info.

This commit is contained in:
godyangfight 2024-10-29 03:51:41 +00:00 committed by ob-robot
parent 1efda3b84b
commit c26dbd1a04
13 changed files with 1068 additions and 79 deletions

View File

@ -131,6 +131,7 @@ WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, 20007, "latch: log external st
WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, 20008, "latch: log external storage handler rw wait", "", "", "", CONCURRENCY, true, false)
WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_WAIT, 20009, "latch: log external storage handler spin wait", "", "", "", CONCURRENCY, true, false)
WAIT_EVENT_DEF(DH_LOCAL_SYNC_COND_WAIT, 20010, "datahub local sync conditional wait", "address", "", "", CONCURRENCY, true, true)
WAIT_EVENT_DEF(TRANSFER_HANDLER_COND_WAIT, 20011, "transfer handler condition wait", "address", "", "", CONCURRENCY, true, true)
// share storage 21001-21999
WAIT_EVENT_DEF(ZONE_STORAGE_MANAGER_LOCK_WAIT, 21001, "latch: zone storage manager maintaince lock wait", "address", "number", "tries", CONCURRENCY, true, false)

View File

@ -661,6 +661,7 @@ class ObString;
ACT(BEFORE_PREFETCH_BACKUP_INFO_TASK,)\
ACT(RS_CHANGE_TURN_DEBUG_SYNC,)\
ACT(AFTER_MIGRATION_CREATE_ALL_TABLET,)\
ACT(BEFORE_PARALLEL_BUILD_TABLET_INFO_TABLET,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -196,6 +196,8 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_BACKFILL_TX, ObDagPrio::DAG_PRIO_HA
true, 3, {"tenant_id", "src_ls_id", "start_scn"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_REPLACE_TABLE, ObDagPrio::DAG_PRIO_HA_HIGH, ObSysTaskType::TRANSFER_TASK, "TRANSFER_REPLACE_TABLE", "TRANSFER",
true, 2, {"tenant_id", "desc_ls_id"})
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TRANSFER_BUILD_TABLET_INFO, ObDagPrio::DAG_PRIO_HA_HIGH, ObSysTaskType::TRANSFER_TASK, "TRANSFER_BUILD_TABLET_INFO", "TRANSFER",
false, 0, {})
// DAG_TYPE_TRANSFER END
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TTL, ObDagPrio::DAG_PRIO_TTL, ObSysTaskType::TABLE_API_TTL_TASK, "TTL_DELTE_DAG", "TTL",
false, 4, {"tenant_id", "ls_id", "table_id", "tablet_id"})

View File

@ -234,6 +234,7 @@ public:
TASK_TYPE_CHECK_CONVERT_TABLET = 79,
TASK_TYPE_VECTOR_INDEX_MEMDATA_SYNC = 80,
TASK_TYPE_DELETE_LOB_META_ROW = 81,
TASK_TYPE_TRANSFER_BUILD_TABLET_INFO = 82,
TASK_TYPE_MAX,
};

View File

@ -270,6 +270,7 @@ ob_set_subtarget(ob_storage high_availability
high_availability/ob_storage_ha_diagnose_mgr.cpp
high_availability/ob_storage_ha_diagnose_service.cpp
high_availability/ob_cs_replica_migration.cpp
high_availability/ob_transfer_parallel_build_tablet_info.cpp
)
ob_set_subtarget(ob_storage restore

View File

@ -2644,4 +2644,4 @@ void ObMacroBlockReuseMgr::free_reuse_value_(ReuseMajorTableValue *&reuse_value)
}
}
}
}

View File

@ -701,6 +701,7 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(ObMigrationChooseSrcHelperInitParam);
};
}
}
#endif

View File

@ -32,6 +32,7 @@
#include "storage/high_availability/ob_storage_ha_diagnose_mgr.h"
#include "storage/tx/wrs/ob_weak_read_util.h"
#include "rootserver/mview/ob_collect_mv_merge_info_task.h"
#include "storage/high_availability/ob_transfer_parallel_build_tablet_info.h"
using namespace oceanbase::transaction;
using namespace oceanbase::share;
@ -78,7 +79,9 @@ ObTransferHandler::ObTransferHandler()
task_info_(),
diagnose_result_msg_(share::ObStorageHACostItemName::MAX_NAME),
transfer_handler_lock_(),
transfer_handler_enabled_(true)
transfer_handler_enabled_(true),
ctx_(),
cond_()
{
}
@ -105,7 +108,9 @@ int ObTransferHandler::init(
} else if (OB_FAIL(transfer_worker_mgr_.init(ls))) {
LOG_WARN("failed to init transfer worker manager", K(ret), KP(ls));
} else if (OB_FAIL(related_info_.init())) {
LOG_WARN("failed to init related_info");
LOG_WARN("failed to init related_info", K(ret));
} else if (OB_FAIL(cond_.init(ObWaitEventIds::TRANSFER_HANDLER_COND_WAIT))) {
LOG_WARN("failed to init thread cond", K(ret));
} else {
ls_ = ls;
bandwidth_throttle_ = bandwidth_throttle;
@ -469,6 +474,9 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("do with start status get invalid argument", K(ret), K(task_info));
} else if (OB_FAIL(ctx_.build_transfer_tablet_info(task_info.dest_ls_id_, task_info.tablet_list_,
task_info.trace_id_, task_info.data_version_))) {
LOG_WARN("failed to build transfer tablet info", K(ret), K(task_info));
} else if (OB_FAIL(ObTransferUtils::get_gts(task_info.tenant_id_, gts_seq_))) {
LOG_WARN("failed to get gts seq", K(ret), K(task_info));
} else if (OB_FAIL(start_trans_(timeout_ctx, trans))) {
@ -642,6 +650,9 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
process_perf_diagnose_info_(ObStorageHACostItemName::TRANSFER_START_END,
ObStorageHADiagTaskType::TRANSFER_START, start_ts, tmp_round, true/*is_report*/);
}
finish_parallel_tablet_info_dag_(task_info);
LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts);
return ret;
}
@ -1077,7 +1088,6 @@ int ObTransferHandler::do_trans_transfer_start_(
LOG_INFO("[TRANSFER] start do trans transfer start", K(task_info));
int ret = OB_SUCCESS;
SCN start_scn;
ObArray<ObMigrationTabletParam> tablet_meta_list;
const share::ObTransferStatus next_status(ObTransferStatus::DOING);
const int64_t start_ts = ObTimeUtil::current_time();
@ -1100,10 +1110,10 @@ int ObTransferHandler::do_trans_transfer_start_(
LOG_WARN("failed to unblock tx", K(ret), K(task_info));
} else if (OB_FAIL(wait_src_ls_replay_to_start_scn_(task_info, start_scn, timeout_ctx))) {
LOG_WARN("failed to wait src ls replay to start scn", K(ret), K(task_info));
} else if (OB_FAIL(get_transfer_tablets_meta_(task_info, tablet_meta_list))) {
LOG_WARN("failed to get transfer tablets meta", K(ret), K(task_info));
} else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, tablet_meta_list, timeout_ctx, trans))) {
LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn), K(tablet_meta_list));
} else if (OB_FAIL(parallel_get_transfer_tablets_meta_(task_info, timeout_ctx))) {
LOG_WARN("failed to do parallel get transfer tablets meta", K(ret), K(task_info));
} else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, timeout_ctx, trans))) {
LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn));
} else if (OB_FAIL(update_transfer_status_(task_info, next_status, start_scn, OB_SUCCESS, trans))) {
LOG_WARN("failed to update transfer status", K(ret), K(task_info));
}
@ -1227,7 +1237,6 @@ int ObTransferHandler::do_trans_transfer_start_v2_(
LOG_INFO("[TRANSFER] start do trans transfer start v2", K(task_info));
int ret = OB_SUCCESS;
SCN start_scn;
ObArray<ObMigrationTabletParam> tablet_meta_list;
const share::ObTransferStatus next_status(ObTransferStatus::DOING);
ObAddr src_ls_leader;
ObStorageHASrcInfo src_info;
@ -1298,8 +1307,8 @@ int ObTransferHandler::do_trans_transfer_start_v2_(
} else if (OB_FAIL(wait_src_ls_replay_to_start_scn_(task_info, start_scn, timeout_ctx))) {
LOG_WARN("failed to wait src ls replay to start scn", K(ret), K(task_info));
} else if (STEP_COST_AND_CHECK_TIMEOUT(wait_src_replay_cost)) {
} else if (OB_FAIL(get_transfer_tablets_meta_(task_info, tablet_meta_list))) {
LOG_WARN("failed to get transfer tablets meta", K(ret), K(task_info));
} else if (OB_FAIL(parallel_get_transfer_tablets_meta_(task_info, timeout_ctx))) {
LOG_WARN("failed to do parallel get transfer tablets meta", K(ret), K(task_info));
} else if (STEP_COST_AND_CHECK_TIMEOUT(get_tablets_meta_cost)) {
// move tx
} else if (OB_FAIL(do_move_tx_to_dest_ls_(task_info, timeout_ctx, trans,
@ -1307,8 +1316,8 @@ int ObTransferHandler::do_trans_transfer_start_v2_(
LOG_WARN("failed to do move tx to dest_ls", K(ret), K(task_info));
} else if (STEP_COST_AND_CHECK_TIMEOUT(move_tx_cost)) {
// transfer in
} else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, tablet_meta_list, timeout_ctx, trans))) {
LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn), K(tablet_meta_list));
} else if (OB_FAIL(do_tx_start_transfer_in_(task_info, start_scn, timeout_ctx, trans))) {
LOG_WARN("failed to do tx start transfer in", K(ret), K(task_info), K(start_scn));
} else if (STEP_COST_AND_CHECK_TIMEOUT(transfer_in_cost)) {
} else if (OB_FAIL(update_transfer_status_(task_info, next_status, start_scn, OB_SUCCESS, trans))) {
LOG_WARN("failed to update transfer status", K(ret), K(task_info));
@ -1810,58 +1819,6 @@ int ObTransferHandler::wait_ls_replay_event_(
return ret;
}
int ObTransferHandler::get_transfer_tablets_meta_(
const share::ObTransferTaskInfo &task_info,
common::ObIArray<ObMigrationTabletParam> &tablet_meta_list)
{
int ret = OB_SUCCESS;
tablet_meta_list.reset();
obrpc::ObCopyTabletInfo tablet_info;
const int64_t start_ts = ObTimeUtil::current_time();
DEBUG_SYNC(BEFORE_START_TRANSFER_GET_TABLET_META);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get transfer tablets meta get invalid argument", K(ret), K(task_info));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < task_info.tablet_list_.count(); ++i) {
const ObTransferTabletInfo &transfer_tablet_info = task_info.tablet_list_.at(i);
ObTabletHandle tablet_handle;
tablet_info.reset();
if (OB_FAIL(ls_->ha_get_tablet(transfer_tablet_info.tablet_id_, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(transfer_tablet_info), K(tablet_handle));
} else if (OB_FAIL(get_next_tablet_info_(task_info, transfer_tablet_info, tablet_handle, tablet_info))) {
LOG_WARN("failed to get next tablet info ", K(ret), K(transfer_tablet_info), K(tablet_handle));
} else if (OB_FAIL(tablet_meta_list.push_back(tablet_info.param_))) {
LOG_WARN("failed to push tablet info into array", K(ret), K(tablet_info));
}
}
LOG_INFO("[TRANSFER_BLOCK_TX] get transfer tablets meta", K(ret), "cost", ObTimeUtil::current_time() - start_ts);
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = EN_GET_TRANSFER_TABLET_META_FAILED ? : OB_SUCCESS;
if (OB_FAIL(ret)) {
STORAGE_LOG(ERROR, "fake EN_GET_TRANSFER_TABLET_META_FAILED", K(ret));
}
}
#endif
}
DEBUG_SYNC(AFTER_START_TRANSFER_GET_TABLET_META);
if (OB_SUCC(ret)) {
process_perf_diagnose_info_(ObStorageHACostItemName::SRC_LS_GET_TABLET_META,
ObStorageHADiagTaskType::TRANSFER_START, 0/*start_ts*/, round_, false/*is_report*/);
} else {
diagnose_result_msg_ = share::ObStorageHACostItemName::SRC_LS_GET_TABLET_META;
}
return ret;
}
int ObTransferHandler::get_next_tablet_info_(
const share::ObTransferTaskInfo &task_info,
const ObTransferTabletInfo &transfer_tablet_info,
@ -1908,25 +1865,28 @@ int ObTransferHandler::get_next_tablet_info_(
int ObTransferHandler::do_tx_start_transfer_in_(
const share::ObTransferTaskInfo &task_info,
const SCN &start_scn,
const common::ObIArray<ObMigrationTabletParam> &tablet_meta_list,
ObTimeoutCtx &timeout_ctx,
common::ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
const int64_t MAX_BUF_LEN = 1.5 * 1024 * 1024; // 1.5M
const int64_t start_ts = ObTimeUtil::current_time();
int64_t tablet_count = 0;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid() || tablet_meta_list.empty() || !start_scn.is_valid()) {
} else if (!task_info.is_valid() || !start_scn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("do tx start transfer in get invalid argument", K(ret), K(task_info),
K(tablet_meta_list), K(start_scn));
LOG_WARN("do tx start transfer in get invalid argument", K(ret), K(task_info), K(start_scn));
} else if (FALSE_IT(tablet_count = ctx_.get_tablet_info_num())) {
} else if (tablet_count != task_info.tablet_list_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transfer tablet meta info not match, unexpected", K(ret), K(tablet_count), K(task_info));
} else {
int64_t index = 0;
ObTXStartTransferInInfo start_transfer_in_info;
while (OB_SUCC(ret) && index < tablet_meta_list.count()) {
while (OB_SUCC(ret) && index < tablet_count) {
start_transfer_in_info.reset();
start_transfer_in_info.src_ls_id_ = task_info.src_ls_id_;
start_transfer_in_info.dest_ls_id_ = task_info.dest_ls_id_;
@ -1939,17 +1899,19 @@ int ObTransferHandler::do_tx_start_transfer_in_(
LOG_WARN("already timeout", K(ret), K(task_info));
}
for (int64_t i = index; i < tablet_meta_list.count() && OB_SUCC(ret); ++i, ++index) {
const ObMigrationTabletParam &tablet_meta = tablet_meta_list.at(i);
if (start_transfer_in_info.get_serialize_size() + tablet_meta.get_serialize_size() > MAX_BUF_LEN) {
for (int64_t i = index; i < tablet_count && OB_SUCC(ret); ++i, ++index) {
const ObMigrationTabletParam *tablet_meta = nullptr;
if (OB_FAIL(ctx_.get_tablet_info(index, tablet_meta))) {
LOG_WARN("failed to get tablet info", K(ret), K(index));
} else if (start_transfer_in_info.get_serialize_size() + tablet_meta->get_serialize_size() > MAX_BUF_LEN) {
if (OB_FAIL(inner_tx_start_transfer_in_(task_info, start_transfer_in_info, trans))) {
LOG_WARN("failed to do inner tx start transfer in", K(ret), K(task_info), K(start_transfer_in_info));
} else {
start_transfer_in_info.reset();
break;
}
} else if (OB_FAIL(start_transfer_in_info.tablet_meta_list_.push_back(tablet_meta))) {
LOG_WARN("failed to push tablet meta into list", K(ret), K(tablet_meta));
} else if (OB_FAIL(start_transfer_in_info.tablet_meta_list_.push_back(*tablet_meta))) {
LOG_WARN("failed to push tablet meta into list", K(ret), KPC(tablet_meta));
}
}
@ -3301,5 +3263,280 @@ int ObTransferHandler::do_clean_diagnose_info_()
return ret;
}
int ObTransferHandler::parallel_get_transfer_tablets_meta_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t start_ts = ObTimeUtil::current_time();
DEBUG_SYNC(BEFORE_START_TRANSFER_GET_TABLET_META);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (OB_FAIL(generate_parallel_tablet_info_dag_(task_info))) {
LOG_WARN("failed to generate parallel tablet info dag", K(ret), K(task_info));
} else if (OB_FAIL(do_build_tablet_info_(task_info, timeout_ctx))) {
LOG_WARN("failed to do build tablet info", K(ret), K(task_info));
}
if (OB_SUCCESS != (tmp_ret = wait_parallel_tablet_info_ready_(task_info, timeout_ctx, ret))) {
LOG_WARN("failed to wait parallel tablet info dag finish", K(tmp_ret), K(task_info));
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = EN_GET_TRANSFER_TABLET_META_FAILED ? : OB_SUCCESS;
if (OB_FAIL(ret)) {
STORAGE_LOG(ERROR, "fake EN_GET_TRANSFER_TABLET_META_FAILED", K(ret));
}
}
#endif
DEBUG_SYNC(AFTER_START_TRANSFER_GET_TABLET_META);
if (OB_SUCC(ret)) {
process_perf_diagnose_info_(ObStorageHACostItemName::SRC_LS_GET_TABLET_META,
ObStorageHADiagTaskType::TRANSFER_START, 0/*start_ts*/, round_, false/*is_report*/);
} else {
diagnose_result_msg_ = share::ObStorageHACostItemName::SRC_LS_GET_TABLET_META;
}
LOG_INFO("finish parallel get transfer tablets meta", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts);
return ret;
}
int ObTransferHandler::generate_parallel_tablet_info_dag_(
const share::ObTransferTaskInfo &task_info)
{
int ret = OB_SUCCESS;
ObTenantDagScheduler *scheduler = nullptr;
const ObDagPrio::ObDagPrioEnum prio = ObDagPrio::DAG_PRIO_HA_HIGH;
ObTransferParallelBuildTabletDag *build_tablet_dag = nullptr;
const bool emergency = true;
ObTransferParallelBuildTabletDag fake_dag;
bool exist = false;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("generate parallel tablet info dag get invalid argument", K(ret), K(task_info));
} else if (OB_FAIL(fake_dag.init(task_info.src_ls_id_, &ctx_))) {
LOG_WARN("failed to init build tablet dag", K(ret), K(task_info));
} else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret));
} else if (OB_FAIL(scheduler->check_dag_exist(&fake_dag, exist))) {
LOG_WARN("failed to check dag exist", K(ret), K(task_info));
} else if (exist) {
ret = OB_EAGAIN;
LOG_WARN("parallel tabelt info dag is already exist, need wait it finish", K(ret), K(task_info));
} else {
if (OB_FAIL(scheduler->alloc_dag_with_priority(prio, build_tablet_dag))) {
LOG_WARN("failed to alloc tablet group migration dag ", K(ret));
} else if (OB_FAIL(build_tablet_dag->init(task_info.src_ls_id_, &ctx_))) {
LOG_WARN("failed to init transfer parallel build tablet dag", K(ret), K(task_info));
} else if (OB_FAIL(build_tablet_dag->create_first_task())) {
LOG_WARN("failed to create first task", K(ret), K(task_info));
} else if (OB_FAIL(scheduler->add_dag(build_tablet_dag, emergency))) {
LOG_WARN("failed to add sys tablets migration dag", K(ret), K(*build_tablet_dag));
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("Fail to add task", K(ret));
ret = OB_EAGAIN;
}
} else {
build_tablet_dag = nullptr;
}
if (OB_NOT_NULL(build_tablet_dag) && OB_NOT_NULL(scheduler)) {
scheduler->free_dag(*build_tablet_dag);
build_tablet_dag = nullptr;
}
if (OB_FAIL(ret)) {
//overwrite ret
//parallel thread is for speed up get_tablet_info
//if it is failed, the main thread will get whole tablet meta info
LOG_INFO("overwrite parallel tablet info result", K(ret));
ret = OB_SUCCESS;
}
}
return ret;
}
int ObTransferHandler::do_build_tablet_info_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx)
{
int ret = OB_SUCCESS;
obrpc::ObCopyTabletInfo tablet_info;
ObTransferTabletInfo transfer_tablet_info;
ObTabletHandle tablet_handle;
int64_t tablet_count = 0;
const int64_t start_ts = ObTimeUtil::current_time();
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("do build tablet info get invalid argument", K(ret), K(task_info));
} else {
while (OB_SUCC(ret)) {
transfer_tablet_info.reset();
tablet_handle.reset();
if (timeout_ctx.is_timeouted()) {
LOG_WARN("transfer trans already timeout, cannot get tablet info", K(ret), K(task_info));
ret = OB_TIMEOUT;
} else if (OB_FAIL(ctx_.get_next_tablet_info(transfer_tablet_info))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("failed to get next tablet info", K(ret), K(task_info));
}
} else if (OB_FAIL(ls_->ha_get_tablet(transfer_tablet_info.tablet_id_, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(transfer_tablet_info), K(tablet_handle));
} else if (OB_FAIL(get_next_tablet_info_(task_info, transfer_tablet_info, tablet_handle, tablet_info))) {
LOG_WARN("failed to get next tablet info ", K(ret), K(transfer_tablet_info), K(tablet_handle));
} else if (OB_FAIL(ctx_.add_tablet_info(tablet_info.param_))) {
LOG_WARN("failed to add tablet info", K(ret), K(task_info), K(tablet_info), K(transfer_tablet_info));
} else {
++tablet_count;
}
}
if (OB_SUCC(ret)) {
LOG_INFO("finish do build tablet infos", K(ret), K(tablet_count), "cost_ts", ObTimeUtil::current_time() - start_ts);
}
}
return ret;
}
int ObTransferHandler::wait_parallel_tablet_info_ready_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx,
int32_t &result)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t WAIT_FINISH_INTERVAL = 1; //1ms
const int64_t start_ts = ObTimeUtil::current_time();
int32_t tmp_result = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("wait parallel tablet info dag finish get invalid argument", K(ret), K(task_info));
} else if (OB_SUCCESS != result) {
//do nothing
} else {
while (OB_SUCC(ret)) {
if (timeout_ctx.is_timeouted()) {
ret = OB_TIMEOUT;
LOG_WARN("wait parallel tablet info dag finish timeout", K(ret));
break;
} else if (ctx_.is_build_tablet_finish()) {
break;
} else if (FALSE_IT(tmp_result = ctx_.get_result())) {
} else if (OB_SUCCESS != tmp_result) {
result = tmp_result;
FLOG_INFO("set parallel build tablet info result", K(ret), K(result));
break;
}
common::ObThreadCondGuard guard(cond_);
if (OB_SUCCESS != (tmp_ret = cond_.wait(WAIT_FINISH_INTERVAL))) {
if (OB_TIMEOUT != tmp_ret) {
LOG_WARN("failed to idle", K(tmp_ret));
}
}
}
}
LOG_INFO("wait parallel tablet info dag finish", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts);
return ret;
}
int ObTransferHandler::wait_parallel_tablet_info_dag_finish_(
const share::ObTransferTaskInfo &task_info)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObTenantDagScheduler *scheduler = nullptr;
const bool force_cancel = true;
ObTransferParallelBuildTabletDag fake_dag;
int64_t child_task_num = 0;
const int64_t WAIT_FINISH_INTERVAL = 1 * 1000; //1s
const int64_t start_ts = ObTimeUtil::current_time();
bool is_exist = true;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("wait parallel tablet info dag finish get invalid argument", K(ret), K(task_info));
} else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ObTenantDagScheduler from MTL", K(ret));
} else if (OB_FAIL(fake_dag.init(task_info.dest_ls_id_, &ctx_))) {
LOG_WARN("failed to create fake dag", K(ret), K(task_info));
} else {
while (true) {
if (OB_FAIL(scheduler->cancel_dag(&fake_dag, force_cancel))) {
LOG_WARN("failed to cancel dag", K(ret), K(task_info));
}
//overwrite ret
if (OB_FAIL(scheduler->check_dag_exist(&fake_dag, is_exist))) {
LOG_WARN("failed to check dag exist", K(ret), K(fake_dag));
}
if (FALSE_IT(child_task_num = ctx_.get_child_task_num())) {
} else if (0 == child_task_num && !is_exist) {
break;
}
common::ObThreadCondGuard guard(cond_);
if (OB_SUCCESS != (tmp_ret = cond_.wait(WAIT_FINISH_INTERVAL))) {
if (OB_TIMEOUT != tmp_ret) {
LOG_WARN("failed to idle", K(tmp_ret));
}
}
}
}
LOG_INFO("wait parallel tablet info dag finish", K(ret), "cost_ts", ObTimeUtil::current_time() - start_ts);
return ret;
}
void ObTransferHandler::wakeup_thread_cond()
{
common::ObThreadCondGuard guard(cond_);
cond_.broadcast();
}
void ObTransferHandler::finish_parallel_tablet_info_dag_(
const share::ObTransferTaskInfo &task_info)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret));
} else if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("finsih parallel tablet info dag get invalid argument", K(ret), K(task_info));
} else if (OB_FAIL(wait_parallel_tablet_info_dag_finish_(task_info))) {
LOG_WARN("failed to wait parallel tablet info dag finish", K(ret), K(task_info));
}
ctx_.reuse();
}
}
}

View File

@ -103,6 +103,7 @@ public:
const uint64_t timestamp,
const int64_t start_ts,
const bool is_report);
void wakeup_thread_cond();
private:
int get_transfer_task_(share::ObTransferTaskInfo &task_info);
int get_transfer_task_from_inner_table_(
@ -242,13 +243,9 @@ private:
const share::ObTransferTaskInfo &task_info,
const share::SCN &start_scn,
ObTimeoutCtx &timeout_ctx);
int get_transfer_tablets_meta_(
const share::ObTransferTaskInfo &task_info,
common::ObIArray<ObMigrationTabletParam> &params);
int do_tx_start_transfer_in_(
const share::ObTransferTaskInfo &task_info,
const share::SCN &start_scn,
const common::ObIArray<ObMigrationTabletParam> &params,
ObTimeoutCtx &timeout_ctx,
common::ObMySQLTransaction &trans);
int inner_tx_start_transfer_in_(
@ -337,6 +334,23 @@ private:
ObMySQLTransaction &trans,
CollectTxCtxInfo &collect_batch,
int64_t &batch_len);
int parallel_get_transfer_tablets_meta_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx);
int generate_parallel_tablet_info_dag_(
const share::ObTransferTaskInfo &task_info);
int do_build_tablet_info_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx);
int wait_parallel_tablet_info_ready_(
const share::ObTransferTaskInfo &task_info,
ObTimeoutCtx &timeout_ctx,
int32_t &result);
int wait_parallel_tablet_info_dag_finish_(
const share::ObTransferTaskInfo &task_info);
void finish_parallel_tablet_info_dag_(
const share::ObTransferTaskInfo &task_info);
private:
static const int64_t INTERVAL_US = 1 * 1000 * 1000; //1s
static const int64_t KILL_TX_MAX_RETRY_TIMES = 3;
@ -358,6 +372,9 @@ private:
share::ObStorageHACostItemName diagnose_result_msg_;
common::SpinRWLock transfer_handler_lock_;
bool transfer_handler_enabled_;
ObTransferBuildTabletInfoCtx ctx_;
common::ObThreadCond cond_;
DISALLOW_COPY_AND_ASSIGN(ObTransferHandler);
};

View File

@ -0,0 +1,355 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#include "ob_transfer_parallel_build_tablet_info.h"
#include "observer/ob_server.h"
#include "ob_physical_copy_task.h"
#include "share/rc/ob_tenant_base.h"
#include "share/scheduler/ob_dag_warning_history_mgr.h"
#include "storage/tablet/ob_tablet_common.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "logservice/ob_log_service.h"
#include "lib/hash/ob_hashset.h"
#include "lib/time/ob_time_utility.h"
#include "observer/ob_server_event_history_table_operator.h"
#include "ob_storage_ha_src_provider.h"
#include "storage/tablet/ob_tablet_iterator.h"
#include "ob_storage_ha_utils.h"
#include "storage/tablet/ob_tablet.h"
#include "share/ls/ob_ls_table_operator.h"
#include "ob_rebuild_service.h"
#include "share/ob_cluster_version.h"
#include "ob_storage_ha_utils.h"
namespace oceanbase
{
using namespace share;
namespace storage
{
/******************ObTransferParallelBuildTabletDag*********************/
ObTransferParallelBuildTabletDag::ObTransferParallelBuildTabletDag()
: ObIDag(ObDagType::DAG_TYPE_TRANSFER_BUILD_TABLET_INFO),
is_inited_(false),
ls_id_(),
ls_handle_(),
ctx_(nullptr)
{
}
ObTransferParallelBuildTabletDag::~ObTransferParallelBuildTabletDag()
{
}
bool ObTransferParallelBuildTabletDag::operator == (const ObIDag &other) const
{
bool is_same = true;
if (this == &other) {
// same
} else if (get_type() != other.get_type()) {
is_same = false;
} else {
const ObTransferParallelBuildTabletDag &other_dag = static_cast<const ObTransferParallelBuildTabletDag&>(other);
if (ls_id_ != other_dag.ls_id_) {
is_same = false;
}
}
return is_same;
}
int64_t ObTransferParallelBuildTabletDag::hash() const
{
int64_t hash_value = 0;
hash_value = common::murmurhash(
&ls_id_, sizeof(ls_id_), hash_value);
return hash_value;
}
int ObTransferParallelBuildTabletDag::fill_dag_key(char *buf, const int64_t buf_len) const
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("tablet rebuild major dag do not init", K(ret));
} else {
int64_t pos = 0;
ret = databuff_printf(buf, buf_len, pos, "ObTransferParallelBuildTabletDag: ls_id = %s", to_cstring(ls_id_));;
if (OB_FAIL(ret)) {
LOG_WARN("failed to fill comment", K(ret), KPC(this));
}
}
return ret;
}
int ObTransferParallelBuildTabletDag::init(
const share::ObLSID &ls_id,
ObTransferBuildTabletInfoCtx *ctx)
{
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("transfer parallel build tablet init twice", K(ret));
} else if (!ls_id.is_valid() || OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("transfer parallel build tablet dag get invalid argument", K(ret), K(ls_id), KP(ctx));
} else if (OB_FAIL(set_dag_id(ctx->get_task_id()))) {
LOG_WARN("failed to set dag id", K(ret), K(ls_id), KPC(ctx));
} else if (OB_FAIL(ObStorageHADagUtils::get_ls(ls_id, ls_handle_))) {
LOG_WARN("failed to get ls", K(ret), K(ls_id));
} else {
ls_id_ = ls_id;
ctx_ = ctx;
is_inited_ = true;
}
return ret;
}
int ObTransferParallelBuildTabletDag::create_first_task()
{
int ret = OB_SUCCESS;
ObTransferParallelBuildTabletTask *task = NULL;
share::ObTransferTabletInfo tablet_info;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallel build tablet dag do not init", K(ret));
} else if (OB_FAIL(alloc_task(task))) {
LOG_WARN("Fail to alloc task", K(ret));
} else if (OB_FAIL(task->init(tablet_info, ctx_))) {
LOG_WARN("failed to init tablet rebuild major task", K(ret), KPC(this));
} else if (OB_FAIL(add_task(*task))) {
LOG_WARN("Fail to add task", K(ret));
} else {
LOG_DEBUG("success to create first task", K(ret), KPC(this));
}
return ret;
}
int ObTransferParallelBuildTabletDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallele build tablet dag do not init", K(ret));
} else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(),
static_cast<int64_t>(tenant_id), ls_id_.id(),
"dag_id", to_cstring(ObCurTraceId::get_trace_id())))) {
LOG_WARN("failed to fill info param", K(ret));
}
return ret;
}
int ObTransferParallelBuildTabletDag::get_ls(ObLS *&ls)
{
int ret = OB_SUCCESS;
ls = nullptr;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallel build tablet dag is not init", K(ret));
} else {
ls = ls_handle_.get_ls();
}
return ret;
}
int64_t ObTransferParallelBuildTabletDag::to_string(char* buf, const int64_t buf_len) const
{
int ret = OB_SUCCESS;
int64_t pos = 0;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ls remove member dag do not init", K(ret));
} else if (FALSE_IT(pos = ObIDag::to_string(buf, buf_len))) {
} else if (pos >= buf_len) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("dag to string buffer length is over limit", K(ret), K(pos), K(buf_len));
}
return pos;
}
/******************ObTransferParallelBuildTabletTask*********************/
ObTransferParallelBuildTabletTask::ObTransferParallelBuildTabletTask()
: ObITask(TASK_TYPE_TRANSFER_BUILD_TABLET_INFO),
is_inited_(false),
first_tablet_info_(),
ctx_(nullptr),
ls_(nullptr)
{
}
ObTransferParallelBuildTabletTask::~ObTransferParallelBuildTabletTask()
{
if (is_inited_) {
ctx_->dec_child_task_num();
ls_->get_transfer_handler()->wakeup_thread_cond();
}
}
int ObTransferParallelBuildTabletTask::init(
const share::ObTransferTabletInfo &first_tablet_info,
ObTransferBuildTabletInfoCtx *ctx)
{
int ret = OB_SUCCESS;
ObTransferParallelBuildTabletDag *dag = nullptr;
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("transfer parallel build tablet task init twice", K(ret));
} else if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("transfer parallel build tablet task init get invalid argument", K(ret), KP(ctx));
} else if (OB_ISNULL(dag = static_cast<ObTransferParallelBuildTabletDag *>(this->get_dag()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transfer parallel build tablet dag should not be NULL", K(ret), KP(dag));
} else if (OB_FAIL(dag->get_ls(ls_))) {
LOG_WARN("failed to get ls", K(ret), KPC(dag), K(first_tablet_info));
} else {
first_tablet_info_ = first_tablet_info;
ctx_ = ctx;
ctx_->inc_child_task_num();
is_inited_ = true;
}
return ret;
}
int ObTransferParallelBuildTabletTask::process()
{
int ret = OB_SUCCESS;
LOG_INFO("start do transfer parallel build tablet task", K(first_tablet_info_));
DEBUG_SYNC(BEFORE_PARALLEL_BUILD_TABLET_INFO_TABLET);
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_));
} else if (ctx_->is_failed()) {
//do nothing
} else if (OB_FAIL(do_build_tablet_infos_())) {
LOG_WARN("failed to do build tablet infos", K(ret), KPC(ctx_));
}
if (OB_FAIL(ret) && OB_NOT_NULL(ctx_)) {
ctx_->set_result(ret);
}
return ret;
}
int ObTransferParallelBuildTabletTask::do_build_tablet_infos_()
{
int ret = OB_SUCCESS;
share::ObTransferTabletInfo tablet_info;
int64_t build_tablet_info_num = 0;
const int64_t start_ts = ObTimeUtil::current_time();
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_));
} else {
if (first_tablet_info_.is_valid()) {
if (OB_FAIL(do_build_tablet_info_(first_tablet_info_))) {
LOG_WARN("failed to do build tablet info", K(ret), K(first_tablet_info_));
} else {
++build_tablet_info_num;
}
}
while (OB_SUCC(ret)) {
if (OB_FAIL(ctx_->get_next_tablet_info(tablet_info))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("failed to get next tablet id", K(ret), KPC(ctx_));
}
} else if (OB_FAIL(do_build_tablet_info_(tablet_info))) {
LOG_WARN("failed to do build tablet info", K(ret), K(tablet_info));
} else {
++build_tablet_info_num;
}
}
LOG_INFO("finish do build tablet infos", K(ret), K(build_tablet_info_num), "cost_ts", ObTimeUtil::current_time() - start_ts);
}
return ret;
}
int ObTransferParallelBuildTabletTask::do_build_tablet_info_(const share::ObTransferTabletInfo &tablet_info)
{
int ret = OB_SUCCESS;
ObTabletHandle tablet_handle;
ObTabletCreateDeleteMdsUserData user_data;
ObTablet *tablet = nullptr;
bool committed_flag = false;
ObMigrationTabletParam param;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("transfer parallel build tablet task do not init", K(ret), KPC(ctx_));
} else if (!tablet_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("do build tablet info get invalid argument", K(ret), K(tablet_info));
} else if (OB_FAIL(ls_->ha_get_tablet(tablet_info.tablet_id_, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(tablet_info), K(tablet_handle));
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet should not be NULL", K(ret), KP(tablet), K(tablet_info));
} else if (OB_FAIL(tablet->ObITabletMdsInterface::get_latest_tablet_status(user_data, committed_flag))) {
LOG_WARN("failed to get tx data", K(ret), KPC(tablet), K(tablet_info));
} else if (ObTabletStatus::TRANSFER_OUT != user_data.tablet_status_) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("tablet status is not match", K(ret), KPC(tablet), K(tablet_info), K(user_data));
} else if (committed_flag) {
ret = OB_STATE_NOT_MATCH;
LOG_WARN("transfer src tablet status is transfer out but is already committed, not match",
K(ret), KPC(tablet), K(tablet_info), K(user_data));
} else if (tablet_info.transfer_seq_ != tablet->get_tablet_meta().transfer_info_.transfer_seq_) {
ret = OB_TABLET_TRANSFER_SEQ_NOT_MATCH;
LOG_WARN("tablet transfer seq is not match", K(ret), KPC(tablet), K(tablet_info));
} else if (OB_FAIL(tablet->build_transfer_tablet_param(ctx_->get_data_version(), ctx_->get_dest_ls_id(), param))) {
LOG_WARN("failed to build transfer tablet param", K(ret), K(tablet_info));
} else if (OB_FAIL(ctx_->add_tablet_info(param))) {
LOG_WARN("failed to add tablet info", K(ret), K(param));
}
return ret;
}
int ObTransferParallelBuildTabletTask::generate_next_task(share::ObITask *&next_task)
{
int ret = OB_SUCCESS;
ObTransferParallelBuildTabletTask *tmp_next_task = nullptr;
bool is_iter_end = false;
int64_t index = 0;
share::ObTransferTabletInfo tablet_info;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_ERROR("parallel create tablet task do not init", K(ret));
} else if (OB_FAIL(ctx_->get_next_tablet_info(tablet_info))) {
if (OB_ITER_END == ret) {
//do nothing
} else {
LOG_WARN("failed to get next copy tablet info index", K(ret), KPC(ctx_));
}
} else if (OB_FAIL(dag_->alloc_task(tmp_next_task))) {
LOG_WARN("failed to alloc task", K(ret));
} else if (OB_FAIL(tmp_next_task->init(tablet_info, ctx_))) {
LOG_WARN("failed to init next task", K(ret), K(tablet_info), K(index));
} else {
next_task = tmp_next_task;
}
return ret;
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEABASE_STORAGE_TRANSFER_PARALLEL_BUILD_TABLET_INFO_
#define OCEABASE_STORAGE_TRANSFER_PARALLEL_BUILD_TABLET_INFO_
#include "share/ob_define.h"
#include "lib/thread/ob_work_queue.h"
#include "lib/thread/ob_dynamic_thread_pool.h"
#include "share/ob_common_rpc_proxy.h" // ObCommonRpcProxy
#include "share/ob_srv_rpc_proxy.h" // ObPartitionServiceRpcProxy
#include "share/scheduler/ob_sys_task_stat.h"
#include "observer/ob_rpc_processor_simple.h"
#include "share/scheduler/ob_tenant_dag_scheduler.h"
#include "share/scn.h"
#include "ob_storage_ha_struct.h"
namespace oceanbase
{
namespace storage
{
class ObTransferParallelBuildTabletDag : public share::ObIDag
{
public:
ObTransferParallelBuildTabletDag();
virtual ~ObTransferParallelBuildTabletDag();
virtual bool operator == (const share::ObIDag &other) const override;
virtual int64_t hash() const override;
virtual int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual int create_first_task() override;
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override;
virtual lib::Worker::CompatMode get_compat_mode() const override
{ return lib::Worker::CompatMode::MYSQL; }
virtual uint64_t get_consumer_group_id() const override
{ return consumer_group_id_; }
virtual int64_t to_string(char* buf, const int64_t buf_len) const override;
int init(
const share::ObLSID &ls_id,
ObTransferBuildTabletInfoCtx *ctx);
int get_ls(ObLS *&ls);
protected:
bool is_inited_;
share::ObLSID ls_id_;
ObLSHandle ls_handle_;
ObTransferBuildTabletInfoCtx *ctx_;
DISALLOW_COPY_AND_ASSIGN(ObTransferParallelBuildTabletDag);
};
class ObTransferParallelBuildTabletTask : public share::ObITask
{
public:
ObTransferParallelBuildTabletTask();
virtual ~ObTransferParallelBuildTabletTask();
int init(
const share::ObTransferTabletInfo &first_tablet_info,
ObTransferBuildTabletInfoCtx *ctx);
virtual int process() override;
virtual int generate_next_task(share::ObITask *&next_task) override;
VIRTUAL_TO_STRING_KV(K("ObTransferParallelBuildTabletTask"), KP(this), KPC(ctx_));
private:
int do_build_tablet_infos_();
int do_build_tablet_info_(const share::ObTransferTabletInfo &tablet_info);
private:
bool is_inited_;
share::ObTransferTabletInfo first_tablet_info_;
ObTransferBuildTabletInfoCtx *ctx_;
ObLS *ls_;
DISALLOW_COPY_AND_ASSIGN(ObTransferParallelBuildTabletTask);
};
}
}
#endif

View File

@ -1040,3 +1040,224 @@ int ObTransferRelatedInfo::get_related_info_task_id(share::ObTransferTaskID &tas
}
return ret;
}
/******************ObTransferTabletInfoMgr*********************/
ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::ObTransferTabletInfoMgr()
: lock_(),
tablet_info_array_()
{
}
ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::~ObTransferTabletInfoMgr()
{
common::SpinWLockGuard guard(lock_);
tablet_info_array_.reset();
}
int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::add_tablet_info(
const ObMigrationTabletParam &param)
{
int ret = OB_SUCCESS;
if (!param.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("add tablet info get invalid argument", K(ret), K(param));
} else {
common::SpinWLockGuard guard(lock_);
if (OB_FAIL(tablet_info_array_.push_back(param))) {
LOG_WARN("failed to add tablet info", K(ret), K(param));
}
}
return ret;
}
int64_t ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::get_tablet_info_num() const
{
common::SpinRLockGuard guard(lock_);
return tablet_info_array_.count();
}
int ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::get_tablet_info(
const int64_t index, const ObMigrationTabletParam *&param)
{
int ret = OB_SUCCESS;
param = nullptr;
common::SpinRLockGuard guard(lock_);
if (index < 0 || index >= tablet_info_array_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get tablet info get invalid argument", K(ret), K(index));
} else {
param = &tablet_info_array_.at(index);
}
return ret;
}
void ObTransferBuildTabletInfoCtx::ObTransferTabletInfoMgr::reuse()
{
common::SpinWLockGuard guard(lock_);
tablet_info_array_.reset();
}
/******************ObTransferBuildTabletInfoCtx*********************/
ObTransferBuildTabletInfoCtx::ObTransferBuildTabletInfoCtx()
: lock_(),
dest_ls_id_(),
index_(0),
tablet_info_array_(),
child_task_num_(0),
total_tablet_count_(0),
result_(OB_SUCCESS),
data_version_(0),
task_id_(),
mgr_()
{
}
ObTransferBuildTabletInfoCtx::~ObTransferBuildTabletInfoCtx()
{
}
void ObTransferBuildTabletInfoCtx::reuse()
{
common::SpinWLockGuard guard(lock_);
total_tablet_count_ = 0;
dest_ls_id_.reset();
index_ = 0;
tablet_info_array_.reset();
child_task_num_ = 0;
result_ = OB_SUCCESS;
data_version_ = 0;
task_id_.reset();
mgr_.reuse();
}
int ObTransferBuildTabletInfoCtx::build_transfer_tablet_info(
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_info_array,
const common::ObCurTraceId::TraceId &task_id,
const uint64_t data_version)
{
int ret = OB_SUCCESS;
common::SpinWLockGuard guard(lock_);
if (0 != index_ || !tablet_info_array_.empty()) {
ret = OB_INIT_TWICE;
LOG_WARN("build transfer tablet info init twice", K(ret), K(index_), K(tablet_info_array_));
} else if (!dest_ls_id.is_valid() || !task_id.is_valid() || 0 == data_version) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("build transfer tablet info get invalid argument", K(ret), K(task_id), K(data_version));
} else if (OB_FAIL(tablet_info_array_.assign(tablet_info_array))) {
LOG_WARN("failed to assign tablet info array", K(ret), K(tablet_info_array));
} else {
dest_ls_id_ = dest_ls_id;
total_tablet_count_ = tablet_info_array_.count();
task_id_ = task_id;
data_version_ = data_version;
}
return ret;
}
bool ObTransferBuildTabletInfoCtx::is_valid() const
{
common::SpinRLockGuard guard(lock_);
return is_valid_();
}
bool ObTransferBuildTabletInfoCtx::is_valid_() const
{
return index_ >= 0 && tablet_info_array_.count() >= 0 && index_ <= tablet_info_array_.count() && data_version_ > 0 && dest_ls_id_.is_valid();
}
int ObTransferBuildTabletInfoCtx::get_next_tablet_info(share::ObTransferTabletInfo &tablet_info)
{
int ret = OB_SUCCESS;
tablet_info.reset();
common::SpinWLockGuard guard(lock_);
if (!is_valid_()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transfer build tablet info ctx is invalid, unexpected", K(ret), KPC(this));
} else if (index_ == tablet_info_array_.count()) {
ret = OB_ITER_END;
} else {
tablet_info = tablet_info_array_.at(index_);
++index_;
}
return ret;
}
void ObTransferBuildTabletInfoCtx::inc_child_task_num()
{
ATOMIC_INC(&child_task_num_);
}
void ObTransferBuildTabletInfoCtx::dec_child_task_num()
{
ATOMIC_DEC(&child_task_num_);
}
int64_t ObTransferBuildTabletInfoCtx::get_child_task_num()
{
int64_t child_task_num = ATOMIC_LOAD(&child_task_num_);
return child_task_num;
}
int64_t ObTransferBuildTabletInfoCtx::get_total_tablet_count()
{
common::SpinRLockGuard guard(lock_);
return total_tablet_count_;
}
bool ObTransferBuildTabletInfoCtx::is_build_tablet_finish() const
{
return total_tablet_count_ == mgr_.get_tablet_info_num();
}
bool ObTransferBuildTabletInfoCtx::is_failed() const
{
common::SpinRLockGuard guard(lock_);
return OB_SUCCESS != result_;
}
void ObTransferBuildTabletInfoCtx::set_result(const int32_t result)
{
common::SpinWLockGuard guard(lock_);
if (OB_SUCCESS == result_) {
result_ = result;
}
}
int32_t ObTransferBuildTabletInfoCtx::get_result()
{
common::SpinRLockGuard guard(lock_);
return result_;
}
int ObTransferBuildTabletInfoCtx::add_tablet_info(
const ObMigrationTabletParam &param)
{
int ret = OB_SUCCESS;
if (!param.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("add tablet info get invalid argument", K(ret), K(param));
} else if (OB_FAIL(mgr_.add_tablet_info(param))) {
LOG_WARN("failed to add tablet info", K(ret), K(param));
}
return ret;
}
int ObTransferBuildTabletInfoCtx::get_tablet_info(
const int64_t index, const ObMigrationTabletParam *&param)
{
int ret = OB_SUCCESS;
param = nullptr;
if (index < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get tablet info get invalid argument", K(ret), K(index));
} else if (OB_FAIL(mgr_.get_tablet_info(index, param))) {
LOG_WARN("failed to get tablet info", K(ret), K(index));
}
return ret;
}
int64_t ObTransferBuildTabletInfoCtx::get_tablet_info_num() const
{
return mgr_.get_tablet_info_num();
}

View File

@ -316,6 +316,71 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObTransferRelatedInfo);
};
struct ObTransferBuildTabletInfoCtx final
{
public:
ObTransferBuildTabletInfoCtx();
~ObTransferBuildTabletInfoCtx();
int build_transfer_tablet_info(
const share::ObLSID &dest_ls_id,
const common::ObIArray<share::ObTransferTabletInfo> &tablet_info_array,
const common::ObCurTraceId::TraceId &task_id,
const uint64_t data_version);
void reuse();
int get_next_tablet_info(share::ObTransferTabletInfo &tablet_info);
bool is_valid() const;
void inc_child_task_num();
void dec_child_task_num();
int64_t get_child_task_num();
int64_t get_total_tablet_count();
bool is_build_tablet_finish() const;
common::ObCurTraceId::TraceId &get_task_id() { return task_id_; }
bool is_failed() const;
void set_result(const int32_t result);
share::ObLSID &get_dest_ls_id() { return dest_ls_id_; }
uint64_t get_data_version() { return data_version_; }
int32_t get_result();
int add_tablet_info(const ObMigrationTabletParam &param);
int get_tablet_info(const int64_t index, const ObMigrationTabletParam *&param);
int64_t get_tablet_info_num() const;
TO_STRING_KV(K_(index), K_(tablet_info_array), K_(child_task_num), K_(total_tablet_count),
K_(result), K_(data_version), K_(task_id));
private:
bool is_valid_() const;
private:
struct ObTransferTabletInfoMgr final
{
public:
ObTransferTabletInfoMgr();
~ObTransferTabletInfoMgr();
int add_tablet_info(const ObMigrationTabletParam &param);
int64_t get_tablet_info_num() const;
int get_tablet_info(const int64_t index, const ObMigrationTabletParam *&param);
void reuse();
TO_STRING_KV(K_(tablet_info_array));
private:
common::SpinRWLock lock_;
common::ObArray<ObMigrationTabletParam> tablet_info_array_;
DISALLOW_COPY_AND_ASSIGN(ObTransferTabletInfoMgr);
};
private:
common::SpinRWLock lock_;
share::ObLSID dest_ls_id_;
int64_t index_;
common::ObArray<share::ObTransferTabletInfo> tablet_info_array_;
int64_t child_task_num_;
int64_t total_tablet_count_;
int32_t result_;
uint64_t data_version_;
common::ObCurTraceId::TraceId task_id_;
ObTransferTabletInfoMgr mgr_;
DISALLOW_COPY_AND_ASSIGN(ObTransferBuildTabletInfoCtx);
};
}
}
#endif