/** * Copyright (c) 2022 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #include "share/ob_rs_mgr.h" #include "share/ob_rpc_struct.h" #include "ob_storage_ha_utils.h" #include "logservice/ob_log_service.h" #include "storage/tx/ob_multi_data_source.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/tablet/ob_tablet.h" #include "share/transfer/ob_transfer_task_operator.h" #include "storage/high_availability/ob_finish_transfer.h" #include "storage/high_availability/ob_transfer_service.h" #include "storage/high_availability/ob_transfer_lock_utils.h" #include "storage/tablet/ob_tablet.h" #include "observer/ob_server_event_history_table_operator.h" using namespace oceanbase::common; using namespace oceanbase::share; using namespace oceanbase::observer; using namespace oceanbase::transaction; namespace oceanbase { namespace storage { //errsim def ERRSIM_POINT_DEF(EN_DOING_UNLOCK_TRANSFER_MEMBER_LIST_FAILED); ERRSIM_POINT_DEF(EN_DOING_LOCK_TRANSFER_TASK_FAILED); ERRSIM_POINT_DEF(EN_DOING_LOCK_MEMBER_LIST_FAILED); ERRSIM_POINT_DEF(EN_FINISH_TRANSFER_IN_FAILED); ERRSIM_POINT_DEF(EN_DOING_WAIT_ALL_DEST_TABLET_NORAML); ERRSIM_POINT_DEF(EN_FINISH_TRANSFER_OUT_FAILED); ERRSIM_POINT_DEF(EN_DOING_UPDATE_TRANSFER_TASK_FAILED); ERRSIM_POINT_DEF(EN_DOING_COMMIT_TRANS_FAILED); ObTxFinishTransfer::ObTxFinishTransfer() : is_inited_(false), task_id_(), tenant_id_(OB_INVALID_ID), src_ls_id_(), dest_ls_id_(), mutex_(), cond_(), sql_proxy_(NULL) {} ObTxFinishTransfer::~ObTxFinishTransfer() {} int ObTxFinishTransfer::init(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("finish transfer do not init", K(ret)); } else if (!task_id.is_valid() || OB_INVALID_ID == tenant_id || !src_ls_id.is_valid() || !dest_ls_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id), K(tenant_id), K(src_ls_id), K(dest_ls_id)); } else if (OB_FAIL(cond_.init(ObWaitEventIds::STORAGE_HA_FINISH_TRANSFER))) { LOG_WARN("failed to init condition", K(ret)); } else { task_id_ = task_id; tenant_id_ = tenant_id; src_ls_id_ = src_ls_id; dest_ls_id_ = dest_ls_id; sql_proxy_ = &sql_proxy; is_inited_ = true; } return ret; } int ObTxFinishTransfer::process(int64_t &round) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tx finish transfer do not init", K(ret)); } else if (OB_FAIL(do_tx_transfer_doing_(task_id_, tenant_id_, src_ls_id_, dest_ls_id_, round))) { LOG_WARN("failed to do tx transfer doing", K(ret), K_(task_id), K_(tenant_id), K_(src_ls_id), K_(dest_ls_id), K(round)); } else { LOG_INFO("process tx finish transfer", K_(task_id), K_(tenant_id), K_(src_ls_id), K_(dest_ls_id)); } return ret; } int ObTxFinishTransfer::do_tx_transfer_doing_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, int64_t &round) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; ObMySQLTransaction trans; ObArray tablet_list; ObMemberList member_list; int64_t quorum = 0; observer::ObInnerSQLConnection *conn = NULL; SCN start_scn; SCN finish_scn; bool is_majority_passed = false; share::ObRsMgr *rs_mgr = GCTX.rs_mgr_; obrpc::ObSrvRpcProxy *svr_rpc_proxy = GCTX.srv_rpc_proxy_; int64_t result = OB_SUCCESS; common::ObArray member_addr_list; bool majority_backfilled = false; ObLSLocation ls_location; bool is_leader = false; bool is_ready = false; ObTransferService *transfer_service = NULL; const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; ObLSHandle ls_handle; ObDisplayTabletList table_lock_tablet_list; transaction::tablelock::ObTableLockOwnerID lock_owner_id; ObTimeoutCtx timeout_ctx; const int64_t tmp_round = round; if (!task_id.is_valid() || OB_INVALID_ID == tenant_id || !src_ls_id.is_valid() || !dest_ls_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id), K(tenant_id), K(src_ls_id), K(dest_ls_id)); } else if (OB_ISNULL(transfer_service = MTL_WITH_CHECK_TENANT(ObTransferService *, tenant_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("transfer service is NULL", K(ret), K(tenant_id)); } else if (OB_FAIL(check_self_ls_leader_(dest_ls_id, is_leader))) { LOG_WARN("failed to check self ls leader", K(ret), K(dest_ls_id)); } else if (!is_leader) { ret = OB_IS_CHANGING_LEADER; LOG_WARN("self is not leader of dest ls", K(ret), K(dest_ls_id)); } else if (OB_FAIL(get_ls_member_list_(tenant_id, dest_ls_id, member_list))) { LOG_WARN("failed to get ls member list", K(ret), K(tenant_id), K(dest_ls_id)); } // TODO(yangyi.yyy): get member list and check self is leader together // 1. Release the locking relationship of the member list of dest_ls and src_ls // (the interface for binding and releasing the locking relationship needs to have the ability to re-entrant) else if (OB_FAIL(unlock_src_and_dest_ls_member_list_(tenant_id, src_ls_id, dest_ls_id, member_list))) { LOG_WARN("failed to unlock src and dest ls member list", K(ret), K(tenant_id), K(src_ls_id), K(dest_ls_id)); } // get tablet info list from inner table else if (OB_FAIL(get_transfer_tablet_info_from_inner_table_( task_id, tenant_id, tablet_list, start_scn, table_lock_tablet_list, lock_owner_id))) { LOG_WARN("failed to get transfer tablet list", K(ret), K(task_id), K(tenant_id)); } else if (OB_FAIL(get_transfer_quorum_(member_list, quorum))) { LOG_WARN("failed to get transfer quorum", K(ret), K(member_list)); } // precheck if majority ls logical table replaced else if (OB_FAIL(check_ls_logical_table_replaced( tenant_id, dest_ls_id, member_list, tablet_list, quorum, is_ready))) { LOG_WARN("wait all ls replica logical table replaced failed", K(ret), K(tenant_id), K(dest_ls_id), K(member_list), K(quorum)); } else if (!is_ready) { LOG_INFO("transfer in tablet not ready", K(ret), K(tenant_id), K(dest_ls_id)); transfer_service->wakeup(); } else if (OB_FAIL(get_ls_handle_(tenant_id, dest_ls_id_, ls_handle))) { LOG_WARN("failed to get ls handle", K(ret)); } else { const ObTransferLockStatus lock_status(ObTransferLockStatus::DOING); #ifdef ERRSIM SERVER_EVENT_SYNC_ADD("TRANSFER", "BEFORE_TRANSFER_DOING_START_TRANS"); #endif DEBUG_SYNC(SWITCH_LEADER_BEFORE_TRANSFER_DOING_START_TRANS); ObTimeoutCtx timeout_ctx; // 2. The leader of dest_ls starts a transaction TRANS_TRANSFER_FINISH if (FAILEDx(start_trans_(tenant_id, trans, timeout_ctx))) { LOG_WARN("failed to start trans", K(ret), K(tenant_id)); } else { #ifdef ERRSIM SERVER_EVENT_SYNC_ADD("TRANSFER", "AFTER_TRANSFER_DOING_START_TRANS"); #endif DEBUG_SYNC(SWITCH_LEADER_AFTER_TRANSFER_DOING_START_TRANS); if (FAILEDx(select_transfer_task_for_update_(task_id, trans))) { LOG_WARN("failed to select for update", K(ret), K(task_id)); } // 3. The dest_ls leader checks whether the transfer tablet corresponding to the src_ls copy corresponding to the // majority replica has been backfilled. Here, try to ensure that it is all completed, and then start the second // step. Only if the threshold is exceeded will there be a majority. // a) This step needs to first lock the member list of dest_ls // b) The dest_ls leader checks whether the majority has completed backfilling // c) Unlock the member list of dest_ls This step and the migration still need to be mutually exclusive, // because there may be dest_leader checking that the majority meets // the conditions. After unlocking the member list, a copy is migrated in. This copy replaces one of the // majority, resulting in a majority If the dispatch does not meet the conditions, the following chapters on // transfer and migration will discuss else if (OB_FAIL(lock_ls_member_list_(tenant_id, dest_ls_id, member_list, lock_status))) { LOG_WARN("failed to lock ls member list", K(ret), K(tenant_id), K(dest_ls_id), K(member_list)); } else if (OB_FAIL(check_ls_logical_table_replaced( tenant_id, dest_ls_id, member_list, tablet_list, quorum, is_ready))) { LOG_WARN("wait all ls replica logical table replaced failed", K(ret), K(tenant_id), K(dest_ls_id), K(member_list), K(quorum)); } else if (!is_ready) { LOG_INFO("transfer in tablet not ready", K(ret), K(tenant_id), K(dest_ls_id)); transfer_service->wakeup(); } else { // 4. The leader node of dest_ls registers the multi-source transaction // ObInnerSQLConnection->register_multi_source_data, and the type is TX_FINISH_TRANSFER_IN (two-way barrier) // The content of the log is src_ls_id, dest_ls_id, tablet_list. This step requires forcibly flushing the redo // log. The purpose of flushing redo here is to obtain finish_scn and check whether the dest_ls log playback is // new enough for src_ls. // a) When the leader of dest_ls receives the resgister_succ, it checks that its replay scn is greater than // start_scn, and has completed the replacement of the logical table. // b) When the leader and follower of dest_ls receive the on_redo stage, // they record the scn returned by on_redo as finish_scn. Check that your replay scn is greater than // start_scn, and the replacement of the logical table has been completed. If it has been completed, // change the ObTabletStatus of transfer_tablets to NORMAL. If it is not completed, the playback of // ob_redo needs to be stuck. if (OB_ISNULL(conn = dynamic_cast(trans.get_connection()))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("conn_ is NULL", K(ret)); } else if (OB_FAIL(do_tx_finish_transfer_in_( task_id, tenant_id, src_ls_id, dest_ls_id, start_scn, tablet_list, conn))) { LOG_WARN("failed to do tx finish transfer in", K(ret), K(task_id), K(tenant_id), K(src_ls_id), K(dest_ls_id), K(tablet_list)); } #ifdef ERRSIM SERVER_EVENT_SYNC_ADD("TRANSFER", "BETWEEN_REGISTER_FINISH_TRANSFER_IN_AND_OUT"); #endif DEBUG_SYNC(SWITCH_LEADER_BETWEEN_FINISH_TRANSFER_IN_AND_OUT); // 5. The leader of dest ls checks that majority ls replia satisfies the finish_scn of the playback // (the step of non-multi-source transaction, only the leader of dest ls does). // Abort is required if no majority replays to finish_scn beyond the threshold. // Here, the leader of dest_ls checks the destination of the majority to be more secure, and if the // FINISH_TRANSFER step fails, it will not affect the reading and writing of dest_ls, but the main switch will // have an impact, so here we will also guarantee all within a certain threshold. Copy, after a certain threshold // is exceeded, the majority is guaranteed; here, the transfer seq at the log stream level also needs to be // incremented (in TX_START_TRASNFER_IN) if (FAILEDx(wait_transfer_tablet_status_normal_(tenant_id, dest_ls_id, tablet_list, start_scn, timeout_ctx, finish_scn))) { LOG_WARN("failed to wait tablet status normal", K(ret), K(tenant_id), K(dest_ls_id), K(tablet_list)); } else if (OB_FAIL(member_list.get_addr_array(member_addr_list))) { LOG_WARN("failed to get addr array", K(ret), K(member_list)); } else if (OB_FAIL(wait_all_ls_replica_replay_scn_(task_id, tenant_id, dest_ls_id, member_addr_list, finish_scn, quorum, timeout_ctx, is_majority_passed))) { LOG_WARN("failed to check ls replica replay scn", K(ret), K(tenant_id), K(dest_ls_id), K(member_addr_list), K(finish_scn)); } else if (!is_majority_passed) { ret = OB_TIMEOUT; LOG_WARN("majority replay scn not passed", K(ret)); } // 6. The leader of dest_ls registers a multi-source transaction, // and the type is TX_FINISH_TRANSFER_OUT. The contents of the log are src_ls_id, dest_ls_id, finish_scn, and // tablet_id_list. // a) When the leader and follower of src_ls receive on_redo, change the ObTabletStatus of transfer_tablets to // TRANFER_OUT_DELETED (the status may share DELETED) if (FAILEDx( do_tx_finish_transfer_out_(task_id, tenant_id, src_ls_id, dest_ls_id, finish_scn, tablet_list, conn))) { LOG_WARN("failed to do tx finish transfer out", K(ret), K(task_id), K(tenant_id), K(src_ls_id), K(dest_ls_id), K(finish_scn)); } // 7. Update __all_transfer_task according to the result of transaction execution. // If successful, push __all_transfer_task to FINISH state, // otherwise keep the status of __all_transfer_task as DOING else if (OB_FAIL(update_transfer_task_result_(task_id, tenant_id, finish_scn, OB_SUCCESS, trans))) { LOG_WARN("failed to update transfer status", K(ret), K(task_id), K(tenant_id), K(finish_scn)); } // 8. unlock table lock on src ls for tablet (must be successful) else if (OB_FAIL(ObTransferLockUtil::unlock_tablet_on_src_ls_for_table_lock( trans, tenant_id, src_ls_id, lock_owner_id, table_lock_tablet_list))) { LOG_WARN("failed to unlock tablet on src ls for table lock", KR(ret), K(tenant_id), K(src_ls_id), K(lock_owner_id), K(table_lock_tablet_list)); } // 9. unlock member list else if (OB_FAIL( unlock_ls_member_list_(tenant_id, dest_ls_id, member_list, lock_status, CONFIG_CHANGE_TIMEOUT))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(dest_ls_id), K(member_list)); } #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_COMMIT_TRANS_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_COMMIT_TRANS_FAILED", K(ret)); } } #endif DEBUG_SYNC(BEFORE_DOING_TRANSFER_COMMIT); // 10. LOG_COMMIT_FINISH bool is_commit = OB_SUCCESS == ret; if (OB_TMP_FAIL(commit_trans_(is_commit, trans))) { if (OB_SUCCESS == ret) { ret = tmp_ret; } } else if (is_commit) { round = 0; } // 11. After the dest_ls leader succeeds, // it will report the corresponding results to RS. // This step does not guarantee success. if (OB_TMP_FAIL(report_result_(task_id, result, svr_rpc_proxy))) { LOG_WARN("failed to report rpc result", K(ret), K(task_id), KP(rs_mgr), KP(svr_rpc_proxy)); } } } } if (OB_TMP_FAIL(record_server_event_(ret, is_ready, tmp_round, start_scn))) { LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(is_ready)); } return ret; } int ObTxFinishTransfer::unlock_src_and_dest_ls_member_list_(const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, common::ObMemberList &member_list) { int ret = OB_SUCCESS; bool is_same = false; const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; const int64_t lock_timeout = CONFIG_CHANGE_TIMEOUT; bool same_member_list = true; const ObTransferLockStatus status(ObTransferLockStatus::START); if (!src_ls_id.is_valid() || !dest_ls_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(src_ls_id), K(dest_ls_id)); } else if (OB_FAIL(unlock_ls_member_list_(tenant_id, src_ls_id, member_list, status, lock_timeout))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(src_ls_id), K(dest_ls_id)); } else if (OB_FAIL(unlock_ls_member_list_(tenant_id, dest_ls_id, member_list, status, lock_timeout))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(src_ls_id), K(dest_ls_id)); } else { LOG_INFO( "[TRANSFER] unlock src and dest ls member list", K(tenant_id), K(src_ls_id), K(dest_ls_id), K(member_list)); #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_UNLOCK_TRANSFER_MEMBER_LIST_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_UNLOCK_TRANSFER_MEMBER_LIST_FAILED", K(ret)); } } #endif } UNUSEDx(member_list); return ret; } int ObTxFinishTransfer::get_transfer_tablet_info_from_inner_table_( const ObTransferTaskID &task_id, const uint64_t tenant_id, common::ObArray &tablet_list, SCN &start_scn, ObDisplayTabletList &table_lock_tablet_list, transaction::tablelock::ObTableLockOwnerID &lock_owner_id) { int ret = OB_SUCCESS; tablet_list.reset(); table_lock_tablet_list.reset(); lock_owner_id.reset(); const bool for_update = false; ObTransferTask transfer_task; if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id)); } else if (OB_FAIL(ObTransferTaskOperator::get(*sql_proxy_, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(tablet_list.assign(transfer_task.get_tablet_list()))) { LOG_WARN("failed to assign tablet_list", KR(ret), K(transfer_task)); } else if (OB_FAIL(table_lock_tablet_list.assign(transfer_task.get_table_lock_tablet_list()))) { LOG_WARN("failed to assign table_lock_tablet_list", KR(ret), K(transfer_task)); } else { start_scn = transfer_task.get_start_scn(); lock_owner_id = transfer_task.get_table_lock_owner_id(); LOG_INFO("get transfer info from inner table", K(task_id), K(tenant_id), K(transfer_task), K(tablet_list), K(table_lock_tablet_list), K(lock_owner_id)); } return ret; } int ObTxFinishTransfer::wait_transfer_tablet_status_normal_( const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObArray &tablet_list, const share::SCN &start_scn, ObTimeoutCtx &timeout_ctx, share::SCN &finish_scn) { int ret = OB_SUCCESS; int64_t begin_us = ObTimeUtility::current_time(); const int64_t CHECK_TABLET_STATUS_INTERVAL = 100_ms; if (OB_INVALID_ID == tenant_id || !ls_id.is_valid() || tablet_list.empty() || !start_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("wait transfer tablet status normal get invalid argument", K(ret), K(tenant_id), K(ls_id), K(tablet_list), K(start_scn)); } else { while (OB_SUCC(ret)) { bool is_ready = false; if (timeout_ctx.is_timeouted()) { ret = OB_TIMEOUT; LOG_WARN("already timeout", K(ret), K(tenant_id), K(ls_id)); } else if (OB_FAIL(check_transfer_tablet_status_normal_(tenant_id, ls_id, tablet_list, start_scn, is_ready, finish_scn))) { LOG_WARN("failed to check transfer tablet status normal", K(ret), K(tenant_id), K(tablet_list)); } else if (is_ready) { LOG_INFO("tablet is ready", K(tenant_id), K(ls_id), K(tablet_list)); break; } else { ob_usleep(CHECK_TABLET_STATUS_INTERVAL); } } } return ret; } int ObTxFinishTransfer::check_transfer_tablet_status_normal_( const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObArray &tablet_list, const share::SCN &start_scn, bool &is_ready, share::SCN &finish_scn) { int ret = OB_SUCCESS; is_ready = true; storage::ObLS *ls = NULL; storage::ObLSHandle ls_handle; finish_scn.reset(); if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) { LOG_WARN("failed to get ls handle", K(ret), K(tenant_id), K(ls_id)); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log stream not exist", K(ret), K(ls_id)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < tablet_list.count(); ++i) { ObTabletHandle tablet_handle; ObTablet *tablet = NULL; ObTabletCreateDeleteMdsUserData user_data; const ObTransferTabletInfo &tablet_info = tablet_list.at(i); const common::ObTabletID &tablet_id = tablet_info.tablet_id_; if (OB_FAIL(ls->get_tablet(tablet_id, tablet_handle, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(ObTXTransferUtils::get_tablet_status(false/*get_commit*/, tablet_handle, user_data))) { LOG_WARN("failed to get tablet status", K(ret), K(tablet_id), K(tablet_handle)); } else if (ObTabletStatus::NORMAL != user_data.tablet_status_) { if (ObTabletStatus::TRANSFER_IN != user_data.tablet_status_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet status is not expected", K(ret), K(tablet_info), K(user_data)); } else { is_ready = false; LOG_INFO("tablet is not ready, need retry", K(tablet_info)); break; } } else if (user_data.transfer_scn_ < start_scn) { ret = OB_ERR_UNEXPECTED; LOG_WARN("finish scn should not smaller than start scn", K(ret), K(tablet_id), K(start_scn), K(user_data)); } else if (user_data.transfer_scn_ > start_scn) { if (0 == i) { finish_scn = user_data.transfer_scn_; } else if (finish_scn != user_data.transfer_scn_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet finish scn is not same, unexpected", K(ret), K(tablet_id), K(start_scn), K(finish_scn), K(user_data)); } } else { is_ready = false; LOG_INFO("tablet is not ready, need retry", K(tablet_info)); break; } } } return ret; } int ObTxFinishTransfer::check_ls_logical_table_replaced(const uint64_t tenant_id, const share::ObLSID &dest_ls_id, const common::ObMemberList &member_list, const common::ObArray &tablet_list, const int64_t quorum, bool &all_backfilled) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; bool is_leader = false; const int64_t cluster_id = GCONF.cluster_id; ObLSLocation ls_location; ObArray addr_array; if (OB_FAIL(check_self_ls_leader_(dest_ls_id, is_leader))) { LOG_WARN("failed to check self ls leader", K(ret), K(dest_ls_id)); } else if (!is_leader) { ret = OB_ERR_UNEXPECTED; LOG_WARN("self is not leader", K(ret), K(dest_ls_id)); } else if (OB_FAIL(member_list.get_addr_array(addr_array))) { LOG_WARN("failed to get addr array", K(ret), K(member_list)); } else if (OB_FAIL(inner_check_ls_logical_table_replaced_( tenant_id, dest_ls_id, addr_array, tablet_list, quorum, all_backfilled))) { LOG_WARN("failed to inner check majority backfilled", K(ret), K(tenant_id), K(dest_ls_id), K(addr_array)); } else { LOG_INFO("check ls logical table replace", K(tenant_id), K(dest_ls_id), K(addr_array), K(tablet_list), K(quorum), K(all_backfilled)); } return ret; } int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t tenant_id, const share::ObLSID &dest_ls_id, const common::ObArray &member_addr_list, const common::ObArray &tablet_list, const int64_t quorum, bool &all_backfilled) { int ret = OB_SUCCESS; all_backfilled = true; storage::ObCheckTransferTabletBackfillProxy batch_proxy( *(GCTX.storage_rpc_proxy_), &obrpc::ObStorageRpcProxy::check_transfer_tablet_backfill_completed); FOREACH_X(location, member_addr_list, OB_SUCC(ret)) { if (OB_ISNULL(location)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("location should not be null", K(ret)); } else { const common::ObAddr &server = *location; const int64_t timeout = GCONF.rpc_timeout; const int64_t cluster_id = GCONF.cluster_id; const uint64_t group_id = share::OBCG_STORAGE; ObCheckTransferTabletBackfillArg arg; arg.tenant_id_ = tenant_id; arg.ls_id_ = dest_ls_id; if (OB_FAIL(arg.tablet_list_.assign(tablet_list))) { LOG_WARN("failed to assign tablet array", K(ret), K(tablet_list)); } else if (OB_FAIL(batch_proxy.call(server, timeout, cluster_id, tenant_id, group_id, arg))) { LOG_WARN("failed to send check transfer tablet backfill request", K(ret), K(server), K(tenant_id)); } else { LOG_INFO("check_transfer_tablet_backfill_completed", K(arg), K(server)); } } } ObArray return_code_array; int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(batch_proxy.wait_all(return_code_array))) { LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; } if (OB_FAIL(ret)) { } else if (return_code_array.count() != member_addr_list.count() || return_code_array.count() != batch_proxy.get_results().count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("cnt not match", K(ret), "return_cnt", return_code_array.count(), "result_cnt", batch_proxy.get_results().count(), "server_cnt", member_addr_list.count()); } else { ARRAY_FOREACH_X(batch_proxy.get_results(), idx, cnt, OB_SUCC(ret)) { const ObCheckTransferTabletBackfillRes *response = batch_proxy.get_results().at(idx); const int res_ret = return_code_array.at(idx); if (OB_SUCCESS != res_ret) { ret = res_ret; LOG_WARN("rpc execute failed", KR(ret), K(idx)); } else if (OB_ISNULL(response)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("response is null", K(ret)); } else if (!response->backfill_finished_) { all_backfilled = false; break; } } } return ret; } int ObTxFinishTransfer::do_tx_finish_transfer_in_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, const SCN &start_scn, const common::ObArray &tablet_list, observer::ObInnerSQLConnection *conn) { int ret = OB_SUCCESS; bool force_flush_redo = true; bool is_leader = false; char *buf = NULL; int64_t buf_len = 0; const transaction::ObTxDataSourceType type = transaction::ObTxDataSourceType::FINISH_TRANSFER_IN; ObTXFinishTransferInInfo finish_transfer_in_info; ObArenaAllocator allocator; ObRegisterMdsFlag flag; flag.need_flush_redo_instantly_ = true; if (OB_ISNULL(conn)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("conn should not be null", K(ret)); } else if (OB_FAIL(build_tx_finish_transfer_in_info_( src_ls_id, dest_ls_id, start_scn, tablet_list, finish_transfer_in_info))) { LOG_WARN("failed to build tx finish transfer in info", K(ret), K(src_ls_id), K(dest_ls_id), K(start_scn), K(tablet_list)); } else if (OB_FAIL(construct_multi_data_source_buf_(finish_transfer_in_info, allocator, buf, buf_len))) { LOG_WARN("failed to construct multi data source buf", K(ret), K(finish_transfer_in_info)); } else if (OB_FAIL(conn->register_multi_data_source(tenant_id, dest_ls_id, type, buf, buf_len, flag))) { LOG_WARN("failed to register multi data source", K(ret), K(tenant_id), K(dest_ls_id), K(type)); } else { #ifdef ERRSIM ObTransferEventRecorder::record_transfer_task_event(task_id, "TX_FINISH_TRANSFER_IN", src_ls_id, dest_ls_id); #endif LOG_INFO("register multi data source for finish transfer in", K(task_id), K(src_ls_id), K(dest_ls_id)); #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_FINISH_TRANSFER_IN_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_FINISH_TRANSFER_IN_FAILED", K(ret)); } } #endif } return ret; } int ObTxFinishTransfer::do_tx_finish_transfer_out_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, const share::SCN &finish_scn, const common::ObArray &tablet_list, observer::ObInnerSQLConnection *conn) { int ret = OB_SUCCESS; bool force_flush_redo = true; bool is_leader = false; char *buf = NULL; int64_t buf_len = 0; const transaction::ObTxDataSourceType type = transaction::ObTxDataSourceType::FINISH_TRANSFER_OUT; ObTXFinishTransferOutInfo finish_transfer_out_info; ObArenaAllocator allocator; ObRegisterMdsFlag flag; flag.need_flush_redo_instantly_ = false; flag.mds_base_scn_ = finish_scn; if (OB_ISNULL(conn)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("conn should not be null", K(ret)); } else if (OB_FAIL(build_tx_finish_transfer_out_info_( src_ls_id, dest_ls_id, finish_scn, tablet_list, finish_transfer_out_info))) { LOG_WARN("failed to build tx finish transfer out info", K(ret), K(src_ls_id), K(dest_ls_id), K(finish_scn), K(tablet_list)); } else if (OB_FAIL(construct_multi_data_source_buf_(finish_transfer_out_info, allocator, buf, buf_len))) { LOG_WARN("failed to construct multi data source buf", K(ret), K(finish_transfer_out_info)); } else if (OB_FAIL(conn->register_multi_data_source(tenant_id, src_ls_id, type, buf, buf_len, flag))) { LOG_WARN("failed to register multi data source", K(ret), K(tenant_id)); } else { #ifdef ERRSIM ObTransferEventRecorder::record_transfer_task_event(task_id, "TX_FINISH_TRANSFER_OUT", src_ls_id, dest_ls_id); #endif LOG_INFO("[TRANSFER] register multi data source for finish transfer out", K(task_id), K(src_ls_id), K(dest_ls_id)); #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_FINISH_TRANSFER_OUT_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_FINISH_TRANSFER_OUT_FAILED", K(ret)); } } #endif DEBUG_SYNC(AFTER_FINISH_TRANSFER_OUT); } return ret; } int ObTxFinishTransfer::wait_all_ls_replica_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObArray &member_addr_list, const share::SCN &finish_scn, const int64_t quorum, ObTimeoutCtx &timeout_ctx, bool &check_passed) { int ret = OB_SUCCESS; common::ObArray finished_addr_list; while (OB_SUCC(ret)) { check_passed = false; if (timeout_ctx.is_timeouted()) { ret = OB_TIMEOUT; LOG_WARN("some ls replay not finished", K(ret), K(tenant_id), K(ls_id)); } else if (OB_FAIL(check_all_ls_replica_replay_scn_( task_id, tenant_id, ls_id, member_addr_list, finish_scn, timeout_ctx, finished_addr_list))) { LOG_WARN("failed to check all ls replica replay scn", K(ret), K(tenant_id), K(member_addr_list), K(ls_id), K(quorum)); } else if (finished_addr_list.count() == member_addr_list.count()) { check_passed = true; LOG_INFO("all ls has passed ls replica replay scn", K(tenant_id), K(ls_id)); break; } else { ob_usleep(DEFAULT_WAIT_INTERVAL_US); } } #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_WAIT_ALL_DEST_TABLET_NORAML ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_WAIT_ALL_DEST_TABLET_NORAML", K(ret)); } } #endif DEBUG_SYNC(AFTER_DOING_TRANSFER_WAIT_REPLAY_SCN); return ret; } int ObTxFinishTransfer::check_all_ls_replica_replay_scn_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObIArray &total_addr_list, const share::SCN &finish_scn, ObTimeoutCtx &timeout_ctx, common::ObIArray &finished_addr_list) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; int64_t cur_quorum = 0; common::ObArray member_addr_list; const int32_t group_id = share::OBCG_STORAGE; if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_addr_list, member_addr_list))) { LOG_WARN("failed to get need check member", K(ret), K(task_id), K(tenant_id), K(ls_id)); } else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn( tenant_id, ls_id, finish_scn, group_id, member_addr_list, timeout_ctx, finished_addr_list))) { LOG_WARN("failed to check ls replay scn", K(ret), K(total_addr_list), K(finish_scn)); } return ret; } int ObTxFinishTransfer::get_ls_handle_( const uint64_t tenant_id, const share::ObLSID &ls_id, storage::ObLSHandle &ls_handle) { int ret = OB_SUCCESS; ls_handle.reset(); ObLSService *ls_service = NULL; if (OB_INVALID_ID == tenant_id || !ls_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(tenant_id), K(ls_id)); } else if (OB_ISNULL(ls_service = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log stream service is NULL", K(ret), K(tenant_id)); } else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { LOG_WARN("failed to get log stream", K(ret), K(tenant_id), K(ls_id)); } return ret; } int ObTxFinishTransfer::get_ls_member_list_( const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObMemberList &member_list) { int ret = OB_SUCCESS; storage::ObLS *ls = NULL; storage::ObLSHandle ls_handle; int64_t quorum = 0; bool is_leader = false; if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) { LOG_WARN("failed to get ls", K(ret), K(tenant_id), K(ls_id)); } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log stream not exist", K(ret), K(ls_id)); } else if (OB_FAIL(check_self_ls_leader_(ls_id, is_leader))) { LOG_WARN("failed to check self ls leader", K(ret), K(ls_id)); } else if (!is_leader) { ret = OB_IS_CHANGING_LEADER; LOG_WARN("self is not leader", K(ret), K(ls_id)); } else if (OB_FAIL(ls->get_paxos_member_list(member_list, quorum))) { LOG_WARN("failed to get paxos member list", K(ret)); } else if (OB_FAIL(check_self_ls_leader_(ls_id, is_leader))) { LOG_WARN("failed to check self ls leader", K(ret), K(ls_id)); } else if (!is_leader) { ret = OB_IS_CHANGING_LEADER; LOG_WARN("self is not leader", K(ret), K(ls_id)); } else { LOG_INFO("get ls member list", K(tenant_id), K(ls_id), K(member_list)); } return ret; } // TODO(yangyi.yyy): impl later // extract common function later int ObTxFinishTransfer::check_same_member_list_( const uint64_t tenant_id, const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, bool &same_member_list) { int ret = OB_SUCCESS; UNUSEDx(tenant_id, src_ls_id, dest_ls_id, same_member_list); return ret; } int ObTxFinishTransfer::unlock_ls_member_list_(const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, const int64_t lock_timeout) { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::unlock_ls_member_list( tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(ls_id), K_(task_id), K(status)); } return ret; } int ObTxFinishTransfer::lock_ls_member_list_(const uint64_t tenant_id, const share::ObLSID &ls_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status) { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list( tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list)); } else { #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_LOCK_MEMBER_LIST_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_LOCK_MEMBER_LIST_FAILED", K(ret)); } } #endif DEBUG_SYNC(AFTER_DOING_TRANSFER_LOCK_MEMBER_LIST); } return ret; } int ObTxFinishTransfer::build_tx_finish_transfer_in_info_(const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, const share::SCN &start_scn, const common::ObArray &tablet_list, ObTXFinishTransferInInfo &transfer_in_info) { int ret = OB_SUCCESS; transfer_in_info.reset(); if (!src_ls_id.is_valid() || !dest_ls_id.is_valid() || !start_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(src_ls_id), K(dest_ls_id)); } else { transfer_in_info.src_ls_id_ = src_ls_id; transfer_in_info.dest_ls_id_ = dest_ls_id; transfer_in_info.start_scn_ = start_scn; if (OB_FAIL(transfer_in_info.tablet_list_.assign(tablet_list))) { LOG_WARN("failed to assign tablet list", K(ret), K(tablet_list)); } } return ret; } int ObTxFinishTransfer::build_tx_finish_transfer_out_info_(const share::ObLSID &src_ls_id, const share::ObLSID &dest_ls_id, const share::SCN &finish_scn, const common::ObArray &tablet_list, ObTXFinishTransferOutInfo &transfer_out_info) { int ret = OB_SUCCESS; transfer_out_info.reset(); if (!src_ls_id.is_valid() || !dest_ls_id.is_valid() || !finish_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(src_ls_id), K(dest_ls_id)); } else { transfer_out_info.src_ls_id_ = src_ls_id; transfer_out_info.dest_ls_id_ = dest_ls_id; transfer_out_info.finish_scn_ = finish_scn; if (OB_FAIL(transfer_out_info.tablet_list_.assign(tablet_list))) { LOG_WARN("failed to assign tablet list", K(ret), K(tablet_list)); } } return ret; } template int ObTxFinishTransfer::construct_multi_data_source_buf_( const TransferInfo &transfer_info, common::ObIAllocator &allocator, char *&buf, int64_t &buf_len) { int ret = OB_SUCCESS; buf = NULL; int64_t pos = 0; buf_len = transfer_info.get_serialize_size(); if (!transfer_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(transfer_info)); } else if (OB_ISNULL(buf = static_cast(allocator.alloc(buf_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory", K(ret), K(buf_len)); } else if (OB_FAIL(transfer_info.serialize(buf, buf_len, pos))) { LOG_WARN("failed to serialize", K(ret), K(transfer_info)); } return ret; } int ObTxFinishTransfer::update_transfer_task_result_(const ObTransferTaskID &task_id, const uint64_t tenant_id, const share::SCN &finish_scn, const int64_t result, ObMySQLTransaction &trans) { int ret = OB_SUCCESS; if (!task_id.is_valid() || OB_INVALID_ID == tenant_id) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id), K(tenant_id)); } else { ObTransferTask transfer_task; const bool for_update = true; ObTransferStatus next_status; next_status = OB_SUCCESS == result ? ObTransferStatus::COMPLETED : ObTransferStatus::DOING; if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(task_id), K(tenant_id)); } else if (transfer_task.get_start_scn() >= finish_scn) { ret = OB_ERR_UNEXPECTED; LOG_WARN("finish scn not expected", K(ret), K(transfer_task), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::update_finish_scn( trans, tenant_id, task_id, transfer_task.get_status(), finish_scn, share::OBCG_STORAGE))) { LOG_WARN("failed to update finish scn", K(ret), K(tenant_id), K(task_id), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::finish_task( trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, ObTransferTaskComment::EMPTY_COMMENT, share::OBCG_STORAGE))) { LOG_WARN("failed to finish task", K(ret), K(tenant_id), K(task_id)); } #ifdef ERRSIM ObTransferEventRecorder::record_advance_transfer_status_event( tenant_id, task_id, src_ls_id_, dest_ls_id_, next_status, result); #endif LOG_INFO("update transfer task result", K(ret), K(task_id), K(tenant_id), K(result), K(finish_scn)); #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_UPDATE_TRANSFER_TASK_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_UPDATE_TRANSFER_TASK_FAILED", K(ret)); } } #endif } return ret; } int ObTxFinishTransfer::report_result_( const ObTransferTaskID &task_id, const int64_t result, obrpc::ObSrvRpcProxy *rpc_proxy) { int ret = OB_SUCCESS; ObAddr leader_addr; int64_t retry_count = 0; const int64_t MAX_RETRY_TIMES = 3; const int64_t REPORT_RETRY_INTERVAL_MS = 100 * 1000; // 100ms ObFinishTransferTaskArg finish_task_arg; const uint64_t tenant_id = MTL_ID(); const share::ObLSID &sys_ls_id = share::SYS_LS; if (OB_FAIL(finish_task_arg.init(tenant_id, task_id))) { LOG_WARN("failed to init finish task arg", K(ret), K(tenant_id), K(task_id)); } else { while (retry_count++ < MAX_RETRY_TIMES) { if (OB_FAIL(ObStorageHAUtils::get_ls_leader(tenant_id, sys_ls_id, leader_addr))) { LOG_WARN("failed to get ls leader", K(ret), K(tenant_id)); } else if (OB_FAIL(rpc_proxy->to(leader_addr).by(tenant_id).finish_transfer_task(finish_task_arg))) { LOG_WARN("failed to report finish transfer task", K(ret), K(leader_addr), K(tenant_id), K(finish_task_arg)); } if (OB_SUCC(ret)) { break; } else { ob_usleep(REPORT_RETRY_INTERVAL_MS); } } } return ret; } int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); is_leader = false; if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader(tenant_id, ls_id, is_leader))) { LOG_WARN("failed to check ls leader", K(ret), K(tenant_id), K(ls_id)); } return ret; } int ObTxFinishTransfer::start_trans_( const uint64_t tenant_id, ObMySQLTransaction &trans, ObTimeoutCtx &timeout_ctx) { int ret = OB_SUCCESS; int64_t finish_trans_timeout = 10_s; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); if (tenant_config.is_valid()) { finish_trans_timeout = tenant_config->_transfer_finish_trans_timeout; } const int64_t stmt_timeout = finish_trans_timeout; if (OB_FAIL(timeout_ctx.set_trx_timeout_us(stmt_timeout))) { LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout)); } else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) { LOG_WARN("set timeout context failed", K(ret)); } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, false/*with_snapshot*/, share::OBCG_STORAGE))) { LOG_WARN("failed to start trans", K(ret), K(tenant_id)); } else { LOG_INFO("start trans", K(tenant_id)); } return ret; } int ObTxFinishTransfer::commit_trans_(const bool is_commit, ObMySQLTransaction &trans) { int ret = OB_SUCCESS; if (OB_FAIL(trans.end(is_commit))) { LOG_WARN("end transaction failed", K(ret)); } else { LOG_INFO("commit trans", K(is_commit)); } return ret; } int ObTxFinishTransfer::get_transfer_quorum_(const ObMemberList &member_list, int64_t &quorum) { int ret = OB_SUCCESS; if (!member_list.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(member_list)); } else { quorum = member_list.get_member_number(); LOG_INFO("get transfer quorum", K(member_list), K(quorum)); } return ret; } int ObTxFinishTransfer::select_transfer_task_for_update_(const ObTransferTaskID &task_id, ObMySQLTransaction &trans) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const bool for_update = true; ObTransferTask task; if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arg", K(ret), K(task_id)); } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (!task.get_status().is_doing_status()) { ret = OB_STATE_NOT_MATCH; LOG_WARN("transfer task status is not doing", K(ret), K(task)); } else { LOG_INFO("select for update", K(task_id), K(task)); #ifdef ERRSIM if (OB_SUCC(ret)) { ret = EN_DOING_LOCK_TRANSFER_TASK_FAILED ? : OB_SUCCESS; if (OB_FAIL(ret)) { STORAGE_LOG(ERROR, "fake EN_DOING_LOCK_TRANSFER_TASK_FAILED", K(ret)); } } #endif } return ret; } int ObTxFinishTransfer::record_server_event_( const int32_t result, const bool is_ready, const int64_t round, const share::SCN &start_scn) const { int ret = OB_SUCCESS; ObSqlString extra_info_str; const share::ObTransferStatus doing_status(ObTransferStatus::DOING); const share::ObTransferStatus finish_status(ObTransferStatus::COMPLETED); const int64_t start_scn_ts = start_scn.is_valid() ? start_scn.convert_to_ts() : 0; const int64_t elapsed_us_from_start_scn = start_scn.is_valid() ? ObTimeUtility::current_time() - start_scn_ts : 0; if (OB_SUCCESS == result) { if (is_ready) { if (OB_FAIL(extra_info_str.append_fmt("msg:\"transfer doing success\";"))) { LOG_WARN("fail to printf wait info", K(ret)); } } else { if (OB_FAIL(extra_info_str.append_fmt("msg:\"wait for ls logical table replaced\";"))) { LOG_WARN("fail to printf wait info", K(ret)); } } } if (OB_SUCC(ret)) { if (OB_FAIL(extra_info_str.append_fmt("round:%ld;", round))) { LOG_WARN("fail to printf retry time", K(ret)); } else if (elapsed_us_from_start_scn > 0 && OB_FAIL(extra_info_str.append_fmt("elapsed_us_from_start_scn:%ld;", elapsed_us_from_start_scn))) { LOG_WARN("fail to printf retry time", K(ret)); } else { if (OB_SUCCESS == result && is_ready) { if (OB_FAIL(write_server_event_(result, extra_info_str, finish_status))) { LOG_WARN("fail to write server event", K(ret), K(result), K(extra_info_str)); } } else { if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { if (OB_FAIL(write_server_event_(result, extra_info_str, doing_status))) { LOG_WARN("fail to write server event", K(ret), K(result), K(extra_info_str)); } else if (elapsed_us_from_start_scn > TASK_EXECUTE_LONG_WARNING_THRESHOLD) { LOG_ERROR("transfer task stuck at doing stage for too long, may be wait log replay or transfer backfill too slow", K(start_scn), K(elapsed_us_from_start_scn), K_(task_id)); } } } } } return ret; } int ObTxFinishTransfer::write_server_event_(const int32_t result, const ObSqlString &extra_info, const share::ObTransferStatus &status) const { int ret = OB_SUCCESS; SERVER_EVENT_ADD("storage_ha", "transfer", "tenant_id", tenant_id_, "trace_id", *ObCurTraceId::get_trace_id(), "src_ls", src_ls_id_.id(), "dest_ls", dest_ls_id_.id(), "status", status.str(), "result", result, extra_info.ptr()); return ret; } } // namespace storage } // namespace oceanbase