change transfer start and abort stage execution to src ls leader

This commit is contained in:
oceanoverflow
2023-07-06 02:12:16 +00:00
committed by ob-robot
parent bec47b2450
commit a43f5a770a
12 changed files with 323 additions and 42 deletions

View File

@ -473,6 +473,7 @@ PCODE_DEF(OB_HA_UNBLOCK_TX, 0x4B5)
PCODE_DEF(OB_HA_LOCK_CONFIG_CHANGE, 0x4B6) PCODE_DEF(OB_HA_LOCK_CONFIG_CHANGE, 0x4B6)
PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B7) PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B7)
PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4B8) PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4B8)
PCODE_DEF(OB_HA_WAKEUP_TRANSFER_SERVICE, 0x4B9)
// sql, including executor // sql, including executor

View File

@ -160,6 +160,7 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_); RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_);
RPC_PROCESSOR(ObStorageWakeupTransferServiceP, gctx_.bandwidth_throttle_);
} }
void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) { void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {

View File

@ -310,7 +310,7 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id)
DEBUG_SYNC(AFTER_TRANSFER_PROCESS_INIT_TASK_AND_BEFORE_NOTIFY_STORAGE); DEBUG_SYNC(AFTER_TRANSFER_PROCESS_INIT_TASK_AND_BEFORE_NOTIFY_STORAGE);
} }
if (OB_FAIL(ret) || task.get_tablet_list().empty()) { if (OB_FAIL(ret) || task.get_tablet_list().empty()) {
} else if (OB_FAIL(notify_storage_transfer_service_(task_id, task.get_dest_ls()))) { } else if (OB_FAIL(notify_storage_transfer_service_(task_id, task.get_src_ls()))) {
LOG_WARN("notify storage transfer service failed", KR(ret), K(task_id), K(task)); LOG_WARN("notify storage transfer service failed", KR(ret), K(task_id), K(task));
} }
TTS_INFO("process init task finish", KR(ret), K(task_id), TTS_INFO("process init task finish", KR(ret), K(task_id),
@ -1424,7 +1424,7 @@ int ObTenantTransferService::unlock_table_lock_(
int ObTenantTransferService::notify_storage_transfer_service_( int ObTenantTransferService::notify_storage_transfer_service_(
const ObTransferTaskID task_id, const ObTransferTaskID task_id,
const ObLSID &dest_ls) const ObLSID &src_ls)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
obrpc::ObStartTransferTaskArg arg; obrpc::ObStartTransferTaskArg arg;
@ -1432,25 +1432,25 @@ int ObTenantTransferService::notify_storage_transfer_service_(
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret)); LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(! task_id.is_valid() || !dest_ls.is_valid())) { } else if (OB_UNLIKELY(! task_id.is_valid() || !src_ls.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", KR(ret), K(task_id), K(dest_ls)); LOG_WARN("invalid arg", KR(ret), K(task_id), K(src_ls));
} else if (OB_FAIL(arg.init(tenant_id_, task_id, dest_ls))) { } else if (OB_FAIL(arg.init(tenant_id_, task_id, src_ls))) {
LOG_WARN("init ObStartTransferTaskArg failed", KR(ret), K(task_id), K(dest_ls)); LOG_WARN("init ObStartTransferTaskArg failed", KR(ret), K(task_id), K(src_ls));
} else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) { } else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("GCTX has null ptr", KR(ret), K(task_id), K_(tenant_id)); LOG_WARN("GCTX has null ptr", KR(ret), K(task_id), K_(tenant_id));
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout( } else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
GCONF.cluster_id, GCONF.cluster_id,
tenant_id_, tenant_id_,
dest_ls, src_ls,
leader_addr))) { // default 1s timeout leader_addr))) { // default 1s timeout
LOG_WARN("get leader failed", KR(ret), K(task_id), LOG_WARN("get leader failed", KR(ret), K(task_id),
"cluster_id", GCONF.cluster_id.get_value(), K_(tenant_id), K(dest_ls), K(leader_addr)); "cluster_id", GCONF.cluster_id.get_value(), K_(tenant_id), K(src_ls), K(leader_addr));
} else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id_).start_transfer_task(arg))) { } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader_addr).by(tenant_id_).start_transfer_task(arg))) {
LOG_WARN("send rpc failed", KR(ret), K(task_id), K(dest_ls), K(leader_addr), K(arg)); LOG_WARN("send rpc failed", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg));
} }
TTS_INFO("send rpc to storage finished", KR(ret), K(task_id), K(dest_ls), K(leader_addr), K(arg)); TTS_INFO("send rpc to storage finished", KR(ret), K(task_id), K(src_ls), K(leader_addr), K(arg));
return ret; return ret;
} }

View File

@ -161,7 +161,9 @@ private:
int unlock_and_clear_task_( int unlock_and_clear_task_(
const share::ObTransferTaskID task_id, const share::ObTransferTaskID task_id,
share::ObTransferTask &task); share::ObTransferTask &task);
int notify_storage_transfer_service_(const share::ObTransferTaskID task_id, const share::ObLSID &dest_ls); int notify_storage_transfer_service_(
const share::ObTransferTaskID task_id,
const share::ObLSID &src_ls);
int add_in_trans_lock_and_refresh_schema_( int add_in_trans_lock_and_refresh_schema_(
ObMySQLTransaction &trans, ObMySQLTransaction &trans,
const share::ObLSID &src_ls, const share::ObLSID &src_ls,

View File

@ -541,6 +541,7 @@ class ObString;
ACT(STOP_RECOVERY_LS_THREAD0,)\ ACT(STOP_RECOVERY_LS_THREAD0,)\
ACT(STOP_RECOVERY_LS_THREAD1,)\ ACT(STOP_RECOVERY_LS_THREAD1,)\
ACT(STOP_TRANSFER_LS_LOGICAL_TABLE_REPLACED,)\ ACT(STOP_TRANSFER_LS_LOGICAL_TABLE_REPLACED,)\
ACT(BEFORE_TRANSFER_DOING,)\
ACT(MAX_DEBUG_SYNC_POINT,) ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -167,6 +167,23 @@ int ObTransferTaskOperator::get_all_task_status(
return ret; return ret;
} }
int ObTransferTaskOperator::get_by_src_ls(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObLSID &src_ls,
ObTransferTask &task)
{
int ret = OB_SUCCESS;
const bool is_src_ls = true;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !src_ls.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls));
} else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, src_ls, is_src_ls, task))) {
LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(src_ls));
}
return ret;
}
int ObTransferTaskOperator::get_by_dest_ls( int ObTransferTaskOperator::get_by_dest_ls(
common::ObISQLClient &sql_proxy, common::ObISQLClient &sql_proxy,
const uint64_t tenant_id, const uint64_t tenant_id,
@ -174,28 +191,12 @@ int ObTransferTaskOperator::get_by_dest_ls(
ObTransferTask &task) ObTransferTask &task)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const bool is_src_ls = false;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !dest_ls.is_valid())) { if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !dest_ls.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(dest_ls)); LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(dest_ls));
} else { } else if (OB_FAIL(get_by_ls_id_(sql_proxy, tenant_id, dest_ls, is_src_ls, task))) {
ObSqlString sql; LOG_WARN("failed to get by ls id", K(ret), K(tenant_id), K(dest_ls));
SMART_VAR(ObISQLClient::ReadResult, result) {
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE dest_ls = %ld",
OB_ALL_TRANSFER_TASK_TNAME, dest_ls.id()))) {
LOG_WARN("fail to assign sql", KR(ret), K(dest_ls));
} else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(construct_transfer_task_(*result.get_result(), task))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("construct transfer task failed", KR(ret), K(tenant_id), K(dest_ls), K(sql));
} else {
LOG_TRACE("dest ls transfer task not found", KR(ret), K(tenant_id), K(dest_ls));
}
}
}
} }
return ret; return ret;
} }
@ -681,6 +682,50 @@ int ObTransferTaskOperator::update_finish_scn(
return ret; return ret;
} }
int ObTransferTaskOperator::get_by_ls_id_(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObLSID &ls_id,
const bool is_src_ls,
ObTransferTask &task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id));
} else {
ObSqlString sql;
SMART_VAR(ObISQLClient::ReadResult, result) {
if (is_src_ls) {
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE src_ls = %ld",
OB_ALL_TRANSFER_TASK_TNAME, ls_id.id()))) {
LOG_WARN("fail to assign sql", KR(ret), K(ls_id));
}
} else {
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE dest_ls = %ld",
OB_ALL_TRANSFER_TASK_TNAME, ls_id.id()))) {
LOG_WARN("fail to assign sql", KR(ret), K(ls_id));
}
}
if (FAILEDx(sql_proxy.read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(construct_transfer_task_(*result.get_result(), task))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("construct transfer task failed", KR(ret), K(tenant_id), K(ls_id), K(sql));
} else {
LOG_WARN("dest ls transfer task not found", KR(ret), K(tenant_id), K(ls_id));
}
}
}
}
return ret;
}
int ObTransferTaskOperator::construct_transfer_tasks_( int ObTransferTaskOperator::construct_transfer_tasks_(
common::sqlclient::ObMySQLResult &res, common::sqlclient::ObMySQLResult &res,
ObIArray<ObTransferTask> &tasks) ObIArray<ObTransferTask> &tasks)

View File

@ -116,6 +116,25 @@ public:
const uint64_t tenant_id, const uint64_t tenant_id,
common::ObIArray<ObTransferTask::TaskStatus> &task_status); common::ObIArray<ObTransferTask::TaskStatus> &task_status);
/*
* get transfer task by src ls (there is no more than 1 transfer task on a ls)
*
* @param [in] sql_proxy: sql client
* @param [in] tenant_id: target tenant_id
* @param [in] src_ls: src ls_id
* @param [out] task: transfer task
* @return
* - OB_ENTRY_NOT_EXIST: not found
* - OB_ERR_UNEXPECTED: more than 1 transfer task on a ls
* - OB_SUCCESS: successful
* - other: failed
*/
static int get_by_src_ls(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObLSID &src_ls,
ObTransferTask &task);
/* /*
* get transfer task by dest ls (there is no more than 1 transfer task on a ls) * get transfer task by dest ls (there is no more than 1 transfer task on a ls)
* *
@ -375,6 +394,12 @@ public:
const ObTransferTaskComment &comment); const ObTransferTaskComment &comment);
private: private:
static int get_by_ls_id_(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObLSID &ls_id,
const bool is_src_ls,
ObTransferTask &task);
static int construct_transfer_tasks_( static int construct_transfer_tasks_(
common::sqlclient::ObMySQLResult &res, common::sqlclient::ObMySQLResult &res,
common::ObIArray<ObTransferTask> &tasks); common::ObIArray<ObTransferTask> &tasks);

View File

@ -273,6 +273,7 @@ bool ObTransferUtils::is_need_retry_error(const int err)
switch (err) { switch (err) {
//Has active trans need retry //Has active trans need retry
case OB_TRANSFER_MEMBER_LIST_NOT_SAME: case OB_TRANSFER_MEMBER_LIST_NOT_SAME:
case OB_LS_LOCATION_LEADER_NOT_EXIST:
case OB_PARTITION_NOT_LEADER: case OB_PARTITION_NOT_LEADER:
case OB_TRANS_TIMEOUT: case OB_TRANS_TIMEOUT:
case OB_TIMEOUT: case OB_TIMEOUT:

View File

@ -107,6 +107,33 @@ void ObTransferHandler::wakeup_()
} }
} }
int ObTransferHandler::wakeup_dest_ls_leader_(const share::ObTransferTaskInfo &task_info)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = task_info.tenant_id_;
const share::ObLSID &dest_ls_id = task_info.dest_ls_id_;
ObLSService *ls_svr = NULL;
common::ObAddr leader_addr;
ObStorageHASrcInfo src_info;
ObStorageRpc *storage_rpc = NULL;
if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(tenant_id, dest_ls_id, leader_addr))) {
LOG_WARN("failed to get ls leader", K(ret), K(tenant_id));
} else {
src_info.src_addr_ = leader_addr;
src_info.cluster_id_ = GCONF.cluster_id;
if (OB_FAIL(storage_rpc->wakeup_transfer_service(tenant_id, src_info))) {
LOG_WARN("failed to wakeup dest ls leader", K(ret), K(task_info), K(src_info));
}
}
return ret;
}
int ObTransferHandler::get_transfer_task_(ObTransferTaskInfo &task_info) int ObTransferHandler::get_transfer_task_(ObTransferTaskInfo &task_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -114,7 +141,7 @@ int ObTransferHandler::get_transfer_task_(ObTransferTaskInfo &task_info)
if (!is_inited_) { if (!is_inited_) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("transfer handler do not init", K(ret)); LOG_WARN("transfer handler do not init", K(ret));
} else if (OB_FAIL(get_transfer_task_from_inner_table_(task_info))) { } else if (OB_FAIL(fetch_transfer_task_from_inner_table_(task_info))) {
if (OB_ENTRY_NOT_EXIST != ret) { if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to get transfer task from inner table", K(ret), KPC(ls_)); LOG_WARN("failed to get transfer task from inner table", K(ret), KPC(ls_));
} }
@ -148,24 +175,84 @@ int ObTransferHandler::get_transfer_task_from_inner_table_(
return ret; return ret;
} }
int ObTransferHandler::get_transfer_task_from_inner_table_( int ObTransferHandler::fetch_transfer_task_from_inner_table_(
share::ObTransferTaskInfo &task_info) share::ObTransferTaskInfo &task_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// currently START stage is executed on src ls leader
// and DOING stage is executed on dest ls leader
// so here try to fetch task by src ls first, then dest ls later
// either one succeeded will return the task
bool src_exist = false;
bool dst_exist = false;
share::ObTransferTaskInfo src_task_info;
share::ObTransferTaskInfo dst_task_info;
if (OB_FAIL(fetch_transfer_task_from_inner_table_by_src_ls_(src_task_info, src_exist))) {
LOG_WARN("failed to fetch transfer task from inner table by src ls", K(ret));
} else if (OB_FAIL(fetch_transfer_task_from_inner_table_by_dest_ls_(dst_task_info, dst_exist))) {
LOG_WARN("failed to fetch transfer task from inner table by dst ls", K(ret));
} else if (src_exist && dst_exist) {
ret = OB_SCHEDULER_TASK_CNT_MISTACH;
LOG_WARN("src task info and dst task info transfer ls overlap", K(ret), K(src_task_info), K(dst_task_info));
} else if (src_exist && OB_FAIL(task_info.assign(src_task_info))) {
LOG_WARN("failed to assign task info", K(ret), K(src_task_info));
} else if (dst_exist && OB_FAIL(task_info.assign(dst_task_info))) {
LOG_WARN("failed to assign task info", K(ret), K(dst_task_info));
}
return ret;
}
int ObTransferHandler::fetch_transfer_task_from_inner_table_by_src_ls_(
share::ObTransferTaskInfo &task_info,
bool &task_exist)
{
int ret = OB_SUCCESS;
task_exist = false;
task_info.reset(); task_info.reset();
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();
const bool for_update = false; const ObLSID &src_ls_id = ls_->get_ls_id();
ObTransferTask task; ObTransferTask task;
const ObLSID &dest_ls_id = ls_->get_ls_id(); if (OB_FAIL(ObTransferTaskOperator::get_by_src_ls(
*sql_proxy_, tenant_id, src_ls_id, task))) {
if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls(*sql_proxy_, tenant_id, dest_ls_id, task))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(src_ls_id));
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(dest_ls_id));
}
} else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) {
LOG_WARN("failed to convert from transfer task", K(ret), K(task)); LOG_WARN("failed to convert from transfer task", K(ret), K(task));
} else if (!task_info.status_.is_start_status()
&& !task_info.status_.is_aborted_status()) {
// task not exist
} else { } else {
LOG_INFO("get transfer task from inner table", K(task_info)); task_exist = true;
}
if (OB_ENTRY_NOT_EXIST == ret || OB_TABLE_NOT_EXIST == ret) {
task_exist = false;
ret = OB_SUCCESS;
}
return ret;
}
int ObTransferHandler::fetch_transfer_task_from_inner_table_by_dest_ls_(
share::ObTransferTaskInfo &task_info,
bool &task_exist)
{
int ret = OB_SUCCESS;
task_exist = false;
task_info.reset();
const uint64_t tenant_id = MTL_ID();
const ObLSID &dest_ls_id = ls_->get_ls_id();
ObTransferTask task;
if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls(
*sql_proxy_, tenant_id, dest_ls_id, task))) {
LOG_WARN("failed to get transfer task by dest ls", K(ret), K(tenant_id), K(dest_ls_id));
} else if (OB_FAIL(task_info.convert_from(tenant_id, task))) {
LOG_WARN("failed to convert from transfer task", K(ret), K(task));
} else if (!task_info.status_.is_doing_status()) {
// task not exist
} else {
task_exist = true;
}
if (OB_ENTRY_NOT_EXIST == ret || OB_TABLE_NOT_EXIST == ret) {
task_exist = false;
ret = OB_SUCCESS;
} }
return ret; return ret;
} }
@ -407,7 +494,15 @@ int ObTransferHandler::do_with_start_status_(const share::ObTransferTaskInfo &ta
if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, round_, task_info))) { if (OB_SUCCESS != (tmp_ret = record_server_event_(ret, round_, task_info))) {
LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info)); LOG_WARN("failed to record server event", K(tmp_ret), K(ret), K(retry_count_), K(task_info));
} }
wakeup_(); // if START stage execution failed, just wakeup self
// if START stage execution succeeded, try to wakeup dest ls leader to go to DOING stage
if (OB_FAIL(ret)) {
wakeup_(); // wakeup self
} else {
if (OB_TMP_FAIL(wakeup_dest_ls_leader_(task_info))) {
LOG_WARN("failed to wakeup dest ls leader", K(tmp_ret), K(task_info));
}
}
LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts); LOG_INFO("[TRANSFER] finish do with start status", K(ret), K(task_info), "cost_ts", ObTimeUtil::current_time() - start_ts);
return ret; return ret;
} }
@ -1509,6 +1604,14 @@ int ObTransferHandler::do_with_doing_status_(const share::ObTransferTaskInfo &ta
const uint64_t tenant_id = task_info.tenant_id_; const uint64_t tenant_id = task_info.tenant_id_;
const share::ObLSID &src_ls_id = task_info.src_ls_id_; const share::ObLSID &src_ls_id = task_info.src_ls_id_;
const share::ObLSID &dest_ls_id = task_info.dest_ls_id_; const share::ObLSID &dest_ls_id = task_info.dest_ls_id_;
#ifdef ERRSIM
SERVER_EVENT_SYNC_ADD("transfer_errsim", "before_transfer_doing",
"task_id", task_id,
"tenant_id", tenant_id,
"src_ls_id", src_ls_id,
"dest_ls_id", dest_ls_id);
DEBUG_SYNC(BEFORE_TRANSFER_DOING);
#endif
if (!task_info.is_valid()) { if (!task_info.is_valid()) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;

View File

@ -76,9 +76,16 @@ private:
const bool for_update, const bool for_update,
common::ObISQLClient &trans, common::ObISQLClient &trans,
share::ObTransferTaskInfo &task_info); share::ObTransferTaskInfo &task_info);
int get_transfer_task_from_inner_table_( int fetch_transfer_task_from_inner_table_(
share::ObTransferTaskInfo &task_info); share::ObTransferTaskInfo &task_info);
int fetch_transfer_task_from_inner_table_by_src_ls_(
share::ObTransferTaskInfo &task_info,
bool &task_exist);
int fetch_transfer_task_from_inner_table_by_dest_ls_(
share::ObTransferTaskInfo &task_info,
bool &task_exist);
void wakeup_(); void wakeup_();
int wakeup_dest_ls_leader_(const share::ObTransferTaskInfo &task_info);
int do_leader_transfer_(); int do_leader_transfer_();
int do_worker_transfer_(); int do_worker_transfer_();

View File

@ -1053,6 +1053,23 @@ void ObStorageUnBlockTxArg::reset()
OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_); OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_);
ObStorageWakeupTransferServiceArg::ObStorageWakeupTransferServiceArg()
: tenant_id_(OB_INVALID_ID)
{
}
bool ObStorageWakeupTransferServiceArg::is_valid() const
{
return OB_INVALID_ID != tenant_id_;
}
void ObStorageWakeupTransferServiceArg::reset()
{
tenant_id_ = OB_INVALID_ID;
}
OB_SERIALIZE_MEMBER(ObStorageWakeupTransferServiceArg, tenant_id_);
ObStorageConfigChangeOpArg::ObStorageConfigChangeOpArg() ObStorageConfigChangeOpArg::ObStorageConfigChangeOpArg()
: tenant_id_(OB_INVALID_ID), : tenant_id_(OB_INVALID_ID),
@ -3077,6 +3094,29 @@ int ObStorageGetLogConfigStatP::process()
return ret; return ret;
} }
ObStorageWakeupTransferServiceP::ObStorageWakeupTransferServiceP(
common::ObInOutBandwidthThrottle *bandwidth_throttle)
: ObStorageStreamRpcP(bandwidth_throttle)
{
}
int ObStorageWakeupTransferServiceP::process()
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = arg_.tenant_id_;
MTL_SWITCH(tenant_id) {
ObTransferService *transfer_service = MTL(ObTransferService*);
if (OB_ISNULL(transfer_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("transfer service should not be NULL", K(ret), KP(transfer_service));
} else {
transfer_service->wakeup();
}
}
return ret;
}
} //namespace obrpc } //namespace obrpc
namespace storage namespace storage
@ -3700,5 +3740,30 @@ int ObStorageRpc::get_config_change_lock_stat(
return ret; return ret;
} }
int ObStorageRpc::wakeup_transfer_service(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
} else {
ObStorageWakeupTransferServiceArg arg;
arg.tenant_id_ = tenant_id;
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
.by(tenant_id)
.dst_cluster_id(src_info.cluster_id_)
.wakeup_transfer_service(arg))) {
LOG_WARN("failed to wakeup transfer service", K(ret), K(src_info), K(arg));
}
}
return ret;
}
} // storage } // storage
} // oceanbase } // oceanbase

View File

@ -715,6 +715,19 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpRes); DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpRes);
}; };
struct ObStorageWakeupTransferServiceArg final
{
OB_UNIS_VERSION(1);
public:
ObStorageWakeupTransferServiceArg();
~ObStorageWakeupTransferServiceArg() {}
bool is_valid() const;
void reset();
TO_STRING_KV(K_(tenant_id));
uint64_t tenant_id_;
};
//src //src
class ObStorageRpcProxy : public obrpc::ObRpcProxy class ObStorageRpcProxy : public obrpc::ObRpcProxy
{ {
@ -748,6 +761,7 @@ public:
RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes); RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
RPC_S(PR5 wakeup_transfer_service, OB_HA_WAKEUP_TRANSFER_SERVICE, (ObStorageWakeupTransferServiceArg));
}; };
template <ObRpcPacketCode RPC_CODE> template <ObRpcPacketCode RPC_CODE>
@ -1049,6 +1063,16 @@ protected:
int process(); int process();
}; };
class ObStorageWakeupTransferServiceP:
public ObStorageStreamRpcP<OB_HA_WAKEUP_TRANSFER_SERVICE>
{
public:
explicit ObStorageWakeupTransferServiceP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
virtual ~ObStorageWakeupTransferServiceP() {}
protected:
int process();
};
} // obrpc } // obrpc
@ -1177,6 +1201,9 @@ public:
const share::ObLSID &ls_id, const share::ObLSID &ls_id,
int64_t &palf_lock_owner, int64_t &palf_lock_owner,
bool &is_locked) = 0; bool &is_locked) = 0;
virtual int wakeup_transfer_service(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info) = 0;
}; };
class ObStorageRpc: public ObIStorageRpc class ObStorageRpc: public ObIStorageRpc
@ -1299,6 +1326,9 @@ public:
const share::ObLSID &ls_id, const share::ObLSID &ls_id,
int64_t &palf_lock_owner, int64_t &palf_lock_owner,
bool &is_locked); bool &is_locked);
virtual int wakeup_transfer_service(
const uint64_t tenant_id,
const ObStorageHASrcInfo &src_info);
private: private:
bool is_inited_; bool is_inited_;