fix transfer abort stuck when abort fail first and migration happen
This commit is contained in:
@ -472,6 +472,9 @@ PCODE_DEF(OB_HA_BLOCK_TX, 0x4B4)
|
|||||||
PCODE_DEF(OB_HA_KILL_TX, 0x4B5)
|
PCODE_DEF(OB_HA_KILL_TX, 0x4B5)
|
||||||
PCODE_DEF(OB_HA_UNBLOCK_TX, 0x4B6)
|
PCODE_DEF(OB_HA_UNBLOCK_TX, 0x4B6)
|
||||||
PCODE_DEF(OB_HA_SWITCH_LEARNER_TO_ACCEPTOR, 0x4B7)
|
PCODE_DEF(OB_HA_SWITCH_LEARNER_TO_ACCEPTOR, 0x4B7)
|
||||||
|
PCODE_DEF(OB_HA_LOCK_CONFIG_CHANGE, 0x4B8)
|
||||||
|
PCODE_DEF(OB_HA_UNLOCK_CONFIG_CHANGE, 0x4B9)
|
||||||
|
PCODE_DEF(OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, 0x4BA)
|
||||||
|
|
||||||
// sql, including executor
|
// sql, including executor
|
||||||
|
|
||||||
|
|||||||
@ -159,6 +159,9 @@ void oceanbase::observer::init_srv_xlator_for_migration(ObSrvRpcXlator *xlator)
|
|||||||
RPC_PROCESSOR(ObStorageBlockTxP, gctx_.bandwidth_throttle_);
|
RPC_PROCESSOR(ObStorageBlockTxP, gctx_.bandwidth_throttle_);
|
||||||
RPC_PROCESSOR(ObStorageKillTxP, gctx_.bandwidth_throttle_);
|
RPC_PROCESSOR(ObStorageKillTxP, gctx_.bandwidth_throttle_);
|
||||||
RPC_PROCESSOR(ObStorageUnBlockTxP, gctx_.bandwidth_throttle_);
|
RPC_PROCESSOR(ObStorageUnBlockTxP, gctx_.bandwidth_throttle_);
|
||||||
|
RPC_PROCESSOR(ObStorageLockConfigChangeP, gctx_.bandwidth_throttle_);
|
||||||
|
RPC_PROCESSOR(ObStorageUnlockConfigChangeP, gctx_.bandwidth_throttle_);
|
||||||
|
RPC_PROCESSOR(ObStorageGetLogConfigStatP, gctx_.bandwidth_throttle_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {
|
void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator *xlator) {
|
||||||
|
|||||||
@ -791,16 +791,9 @@ int ObTxFinishTransfer::lock_ls_member_list_(const uint64_t tenant_id, const sha
|
|||||||
const common::ObMemberList &member_list, const ObTransferLockStatus &status)
|
const common::ObMemberList &member_list, const ObTransferLockStatus &status)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
storage::ObLS *ls = NULL;
|
if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list(
|
||||||
storage::ObLSHandle ls_handle;
|
tenant_id, ls_id, task_id_.id(), member_list, status, *sql_proxy_))) {
|
||||||
if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) {
|
LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list));
|
||||||
LOG_WARN("failed to get ls", K(ret), K(tenant_id), K(ls_id));
|
|
||||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("log stream not exist", K(ret), K(ls_id));
|
|
||||||
} else if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list(
|
|
||||||
tenant_id, ls_id, task_id_.id(), member_list, status, ls, *sql_proxy_))) {
|
|
||||||
LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list), KPC(ls));
|
|
||||||
} else {
|
} else {
|
||||||
#ifdef ERRSIM
|
#ifdef ERRSIM
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
|||||||
@ -20,6 +20,7 @@
|
|||||||
#include "storage/ob_common_id_utils.h"
|
#include "storage/ob_common_id_utils.h"
|
||||||
#include "share/ob_common_id.h"
|
#include "share/ob_common_id.h"
|
||||||
#include "observer/ob_server_event_history_table_operator.h"
|
#include "observer/ob_server_event_history_table_operator.h"
|
||||||
|
#include "storage/high_availability/ob_storage_ha_utils.h"
|
||||||
|
|
||||||
using namespace oceanbase::share;
|
using namespace oceanbase::share;
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -59,18 +60,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c
|
|||||||
std::sort(sorted_ls_list.begin(), sorted_ls_list.end());
|
std::sort(sorted_ls_list.begin(), sorted_ls_list.end());
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < sorted_ls_list.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < sorted_ls_list.count(); ++i) {
|
||||||
const share::ObLSID &ls_id = sorted_ls_list.at(i);
|
const share::ObLSID &ls_id = sorted_ls_list.at(i);
|
||||||
ObLSService *ls_srv = NULL;
|
if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) {
|
||||||
ObLSHandle ls_handle;
|
|
||||||
ObLS *ls = NULL;
|
|
||||||
if (OB_ISNULL(ls_srv = MTL(ObLSService *))) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_ERROR("ls srv should not be NULL", K(ret), KP(ls_srv));
|
|
||||||
} else if (OB_FAIL(ls_srv->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
|
||||||
LOG_ERROR("ls_srv->get_ls() fail", KR(ret));
|
|
||||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("ls is NULL", KR(ret), K(ls_handle));
|
|
||||||
} else if (OB_FAIL(lock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, ls, sql_proxy))) {
|
|
||||||
LOG_WARN("failed to lock ls member list", K(ret), K(ls_id), K(member_list));
|
LOG_WARN("failed to lock ls member list", K(ret), K(ls_id), K(member_list));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -79,7 +69,7 @@ int ObMemberListLockUtils::batch_lock_ls_member_list(const uint64_t tenant_id, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id,
|
int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id,
|
||||||
const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status, ObLS *ls,
|
const int64_t task_id, const common::ObMemberList &member_list, const ObTransferLockStatus &status,
|
||||||
common::ObMySQLProxy &sql_proxy)
|
common::ObMySQLProxy &sql_proxy)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -87,10 +77,7 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s
|
|||||||
int64_t real_lock_owner = -1;
|
int64_t real_lock_owner = -1;
|
||||||
ObSqlString member_list_str;
|
ObSqlString member_list_str;
|
||||||
const int64_t lock_timeout = CONFIG_CHANGE_TIMEOUT;
|
const int64_t lock_timeout = CONFIG_CHANGE_TIMEOUT;
|
||||||
if (OB_ISNULL(ls)) {
|
if (!ls_id.is_valid() || !member_list.is_valid()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("ls should not be null", K(ret), KP(ls));
|
|
||||||
} else if (!ls_id.is_valid() || !member_list.is_valid()) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("get invalid args", K(ret), K(ls_id));
|
LOG_WARN("get invalid args", K(ret), K(ls_id));
|
||||||
} else if (OB_FAIL(unlock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) {
|
} else if (OB_FAIL(unlock_ls_member_list(tenant_id, ls_id, task_id, member_list, status, sql_proxy))) {
|
||||||
@ -115,11 +102,11 @@ int ObMemberListLockUtils::lock_ls_member_list(const uint64_t tenant_id, const s
|
|||||||
bool need_lock_palf = false;
|
bool need_lock_palf = false;
|
||||||
if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, real_lock_owner, member_list_str.ptr()))) {
|
if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, real_lock_owner, member_list_str.ptr()))) {
|
||||||
LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status), K(real_lock_owner));
|
LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(task_id), K(status), K(real_lock_owner));
|
||||||
} else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, palf_is_locked))) {
|
} else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) {
|
||||||
LOG_WARN("failed to get config change lock stat", K(ret), KPC(ls), K(tenant_id), K(ls_id));
|
LOG_WARN("failed to get config change lock stat", K(ret), K(tenant_id), K(ls_id));
|
||||||
} else if (OB_FAIL(check_lock_status_(palf_is_locked, palf_lock_owner, real_lock_owner, need_lock_palf))) {
|
} else if (OB_FAIL(check_lock_status_(palf_is_locked, palf_lock_owner, real_lock_owner, need_lock_palf))) {
|
||||||
LOG_WARN("failed to check lock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(real_lock_owner));
|
LOG_WARN("failed to check lock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(real_lock_owner));
|
||||||
} else if (need_lock_palf && OB_FAIL(try_lock_config_change(lock_info, lock_timeout, ls))) {
|
} else if (need_lock_palf && OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) {
|
||||||
LOG_WARN("failed to try lock config config",
|
LOG_WARN("failed to try lock config config",
|
||||||
K(ret),
|
K(ret),
|
||||||
K(tenant_id),
|
K(tenant_id),
|
||||||
@ -162,8 +149,6 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const
|
|||||||
int64_t palf_lock_owner = -1;
|
int64_t palf_lock_owner = -1;
|
||||||
bool need_unlock = true;
|
bool need_unlock = true;
|
||||||
bool need_relock_before_unlock = false;
|
bool need_relock_before_unlock = false;
|
||||||
storage::ObLSHandle ls_handle;
|
|
||||||
storage::ObLS *ls = NULL;
|
|
||||||
|
|
||||||
if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, lock_info, trans))) {
|
if (OB_FAIL(ObTransferLockInfoOperator::get(row_key, task_id, status, for_update, lock_info, trans))) {
|
||||||
if (OB_ENTRY_NOT_EXIST == ret) { // palf need to be unlocked
|
if (OB_ENTRY_NOT_EXIST == ret) { // palf need to be unlocked
|
||||||
@ -178,21 +163,16 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const
|
|||||||
} else {
|
} else {
|
||||||
LOG_WARN("failed to get lock info", K(ret), K(tenant_id), K(row_key));
|
LOG_WARN("failed to get lock info", K(ret), K(tenant_id), K(row_key));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
} else if (OB_FAIL(get_config_change_lock_stat_(lock_info, palf_lock_owner, palf_is_locked))) {
|
||||||
LOG_WARN("failed to get ls handle", K(ret), K(tenant_id), K(ls_id));
|
LOG_WARN("failed to get config change lock stat");
|
||||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("ls should not be null", K(ret));
|
|
||||||
} else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, palf_is_locked))) {
|
|
||||||
LOG_WARN("failed to get config change lock stat", K(ret));
|
|
||||||
} else if (OB_FAIL(check_unlock_status_(palf_is_locked, palf_lock_owner, lock_info.lock_owner_,
|
} else if (OB_FAIL(check_unlock_status_(palf_is_locked, palf_lock_owner, lock_info.lock_owner_,
|
||||||
need_unlock, need_relock_before_unlock))) {
|
need_unlock, need_relock_before_unlock))) {
|
||||||
LOG_WARN("failed to check unlock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(lock_info));
|
LOG_WARN("failed to check unlock status", K(ret), K(palf_is_locked), K(palf_lock_owner), K(lock_info));
|
||||||
} else if (FALSE_IT(lock_owner = lock_info.lock_owner_)) {
|
} else if (FALSE_IT(lock_owner = lock_info.lock_owner_)) {
|
||||||
// assign lock owner
|
// assign lock owner
|
||||||
} else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout, ls))) {
|
} else if (need_relock_before_unlock && OB_FAIL(relock_before_unlock_(lock_info, palf_lock_owner, lock_timeout))) {
|
||||||
LOG_WARN("failed to relock config change", K(ret), K(lock_info), K(palf_lock_owner), KPC(ls));
|
LOG_WARN("failed to relock config change", K(ret), K(lock_info), K(palf_lock_owner));
|
||||||
} else if (need_unlock && OB_FAIL(unlock_config_change(lock_info, lock_timeout, ls))) {
|
} else if (need_unlock && OB_FAIL(unlock_config_change_(lock_info, lock_timeout))) {
|
||||||
LOG_WARN("failed to get paxos member list", K(ret), K(lock_info));
|
LOG_WARN("failed to get paxos member list", K(ret), K(lock_info));
|
||||||
} else if (OB_FAIL(ObTransferLockInfoOperator::remove(tenant_id, ls_id, task_id, status, trans))) {
|
} else if (OB_FAIL(ObTransferLockInfoOperator::remove(tenant_id, ls_id, task_id, status, trans))) {
|
||||||
LOG_WARN("failed to update lock info", K(ret), K(row_key), K(lock_info), K(palf_lock_owner), K(palf_is_locked));
|
LOG_WARN("failed to update lock info", K(ret), K(row_key), K(lock_info), K(palf_lock_owner), K(palf_is_locked));
|
||||||
@ -232,92 +212,220 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#ifdef ERRSIM
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
SERVER_EVENT_SYNC_ADD("transfer", "unlock_ls_member_list_failed",
|
||||||
|
"tenant_id", tenant_id,
|
||||||
|
"ls_id", ls_id.id(),
|
||||||
|
"task_id", task_id,
|
||||||
|
"status", status,
|
||||||
|
"result", ret);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMemberListLockUtils::try_lock_config_change(
|
int ObMemberListLockUtils::try_lock_config_change_(
|
||||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls)
|
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(ls)) {
|
bool ls_exist = false;
|
||||||
|
ObLSService *ls_svr = NULL;
|
||||||
|
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||||
|
const int64_t lock_owner = lock_info.lock_owner_;
|
||||||
|
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("ls should not be null", K(ret), KP(ls));
|
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||||
} else if (!lock_info.is_valid()) {
|
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||||
|
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||||
|
} else if (!ls_exist) {
|
||||||
|
if (OB_FAIL(try_lock_config_change_fallback_(lock_info, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("try lock config change fallback", K(lock_info), K(lock_timeout));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||||
|
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||||
|
} else if (OB_FAIL(ls_handle.get_ls()->try_lock_config_change(lock_owner, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("try lock config change", K(lock_info), K(lock_timeout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef ERRSIM
|
||||||
|
SERVER_EVENT_ADD("TRANSFER_LOCK", "LOCK_CONFIG_CHANGE",
|
||||||
|
"tenant_id", lock_info.tenant_id_,
|
||||||
|
"ls_id", lock_info.ls_id_.id(),
|
||||||
|
"task_id", lock_info.task_id_,
|
||||||
|
"status", lock_info.status_.str(),
|
||||||
|
"lock_owner", lock_info.lock_owner_,
|
||||||
|
"lock_member_list", lock_info.comment_);
|
||||||
|
#endif
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObMemberListLockUtils::try_lock_config_change_fallback_(
|
||||||
|
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObLSService *ls_svr = NULL;
|
||||||
|
ObStorageRpc *storage_rpc = NULL;
|
||||||
|
ObStorageHASrcInfo src_info;
|
||||||
|
src_info.cluster_id_ = GCONF.cluster_id;
|
||||||
|
if (!lock_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||||
} else if (OB_FAIL(ls->try_lock_config_change(lock_info.lock_owner_, lock_timeout))) {
|
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||||
|
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||||
|
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||||
|
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||||
|
} else if (OB_FAIL(storage_rpc->lock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||||
|
lock_info.lock_owner_, lock_timeout))) {
|
||||||
LOG_WARN("failed to try lock config config", K(ret), K(lock_info));
|
LOG_WARN("failed to try lock config config", K(ret), K(lock_info));
|
||||||
} else {
|
|
||||||
#ifdef ERRSIM
|
|
||||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "LOCK_CONFIG_CHANGE",
|
|
||||||
"tenant_id", lock_info.tenant_id_,
|
|
||||||
"ls_id", lock_info.ls_id_.id(),
|
|
||||||
"task_id", lock_info.task_id_,
|
|
||||||
"status", lock_info.status_.str(),
|
|
||||||
"lock_owner", lock_info.lock_owner_,
|
|
||||||
"lock_member_list", lock_info.comment_);
|
|
||||||
#endif
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
if (OB_TMP_FAIL(ObMemberListLockUtils::record_config_change_lock_stat(lock_info, ls))) {
|
|
||||||
LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMemberListLockUtils::record_config_change_lock_stat(const ObTransferTaskLockInfo &lock_info, ObLS *ls)
|
int ObMemberListLockUtils::get_config_change_lock_stat_(
|
||||||
|
const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t palf_lock_owner = -1;
|
bool ls_exist = false;
|
||||||
bool is_locked = false;
|
ObLSService *ls_svr = NULL;
|
||||||
if (OB_ISNULL(ls)) {
|
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||||
|
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("ls should not be null", K(ret), KP(ls));
|
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||||
} else if (!lock_info.is_valid()) {
|
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||||
|
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||||
|
} else if (!ls_exist) {
|
||||||
|
if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, palf_lock_owner, is_locked))) {
|
||||||
|
LOG_WARN("failed to get lock config change fallback", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("get lock config change stat fallback", K(lock_info), K(palf_lock_owner), K(is_locked));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||||
|
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||||
|
} else if (OB_FAIL(ls_handle.get_ls()->get_config_change_lock_stat(palf_lock_owner, is_locked))) {
|
||||||
|
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("get lock config change stat", K(lock_info), K(palf_lock_owner), K(is_locked));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef ERRSIM
|
||||||
|
SERVER_EVENT_ADD("TRANSFER_LOCK", "GET_CONFIG_CHANGE_LOCK_STAT",
|
||||||
|
"tenant_id", lock_info.tenant_id_,
|
||||||
|
"ls_id", lock_info.ls_id_.id(),
|
||||||
|
"task_id", lock_info.task_id_,
|
||||||
|
"status", lock_info.status_.str(),
|
||||||
|
"palf_lock_owner", palf_lock_owner,
|
||||||
|
"is_locked", is_locked);
|
||||||
|
#endif
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObMemberListLockUtils::get_config_change_lock_stat_fallback_(
|
||||||
|
const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
palf_lock_owner = -1;
|
||||||
|
is_locked = false;
|
||||||
|
ObLSService *ls_svr = NULL;
|
||||||
|
ObStorageRpc *storage_rpc = NULL;
|
||||||
|
ObStorageHASrcInfo src_info;
|
||||||
|
src_info.cluster_id_ = GCONF.cluster_id;
|
||||||
|
if (!lock_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||||
} else if (OB_FAIL(ls->get_config_change_lock_stat(palf_lock_owner, is_locked))) {
|
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||||
|
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||||
|
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||||
|
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||||
|
} else if (OB_FAIL(storage_rpc->get_config_change_lock_stat(lock_info.tenant_id_, src_info,
|
||||||
|
lock_info.ls_id_, palf_lock_owner, is_locked))) {
|
||||||
LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info));
|
LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info));
|
||||||
} else {
|
|
||||||
#ifdef ERRSIM
|
|
||||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "GET_CONFIG_CHANGE_LOCK_STAT",
|
|
||||||
"tenant_id", lock_info.tenant_id_,
|
|
||||||
"ls_id", lock_info.ls_id_.id(),
|
|
||||||
"task_id", lock_info.task_id_,
|
|
||||||
"status", lock_info.status_.str(),
|
|
||||||
"palf_lock_owner", palf_lock_owner,
|
|
||||||
"is_locked", is_locked);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMemberListLockUtils::unlock_config_change(
|
int ObMemberListLockUtils::unlock_config_change_(
|
||||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls)
|
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(ls)) {
|
bool ls_exist = false;
|
||||||
|
ObLSService *ls_svr = NULL;
|
||||||
|
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||||
|
const int64_t lock_owner = lock_info.lock_owner_;
|
||||||
|
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("ls should not be null", K(ret), KP(ls));
|
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||||
} else if (!lock_info.is_valid()) {
|
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||||
|
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||||
|
} else if (!ls_exist) {
|
||||||
|
if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("unlock lock config change fallback", K(lock_info), K(lock_timeout));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||||
|
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||||
|
} else if (OB_FAIL(ls_handle.get_ls()->unlock_config_change(lock_owner, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("unlock lock config change", K(lock_info), K(lock_timeout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef ERRSIM
|
||||||
|
SERVER_EVENT_ADD("TRANSFER_LOCK", "UNLOCK_CONFIG_CHANGE",
|
||||||
|
"tenant_id", lock_info.tenant_id_,
|
||||||
|
"ls_id", lock_info.ls_id_.id(),
|
||||||
|
"task_id", lock_info.task_id_,
|
||||||
|
"status", lock_info.status_.str(),
|
||||||
|
"lock_owner", lock_info.lock_owner_,
|
||||||
|
"unlock_member_list", lock_info.comment_);
|
||||||
|
#endif
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObMemberListLockUtils::unlock_config_change_fallback_(
|
||||||
|
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObLSService *ls_svr = NULL;
|
||||||
|
ObStorageRpc *storage_rpc = NULL;
|
||||||
|
ObStorageHASrcInfo src_info;
|
||||||
|
src_info.cluster_id_ = GCONF.cluster_id;
|
||||||
|
if (!lock_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||||
} else if (OB_FAIL(ls->unlock_config_change(lock_info.lock_owner_, lock_timeout))) {
|
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||||
LOG_WARN("failed to unlock config config", K(ret), K(lock_info));
|
ret = OB_ERR_UNEXPECTED;
|
||||||
} else {
|
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||||
#ifdef ERRSIM
|
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "UNLOCK_CONFIG_CHANGE",
|
ret = OB_ERR_UNEXPECTED;
|
||||||
"tenant_id", lock_info.tenant_id_,
|
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||||
"ls_id", lock_info.ls_id_.id(),
|
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||||
"task_id", lock_info.task_id_,
|
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||||
"status", lock_info.status_.str(),
|
} else if (OB_FAIL(storage_rpc->unlock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||||
"lock_owner", lock_info.lock_owner_,
|
lock_info.lock_owner_, lock_timeout))) {
|
||||||
"unlock_member_list", lock_info.comment_);
|
LOG_WARN("failed to try lock config config", K(ret), K(lock_info));
|
||||||
#endif
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
if (OB_TMP_FAIL(ObMemberListLockUtils::record_config_change_lock_stat(lock_info, ls))) {
|
|
||||||
LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -487,13 +595,13 @@ int ObMemberListLockUtils::check_unlock_status_(
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObMemberListLockUtils::relock_before_unlock_(const ObTransferTaskLockInfo &lock_info,
|
int ObMemberListLockUtils::relock_before_unlock_(const ObTransferTaskLockInfo &lock_info,
|
||||||
const int64_t palf_lock_owner, const int64_t lock_timeout, ObLS *ls)
|
const int64_t palf_lock_owner, const int64_t lock_timeout)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(try_lock_config_change(lock_info, lock_timeout, ls))) {
|
if (OB_FAIL(try_lock_config_change_(lock_info, lock_timeout))) {
|
||||||
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("relock before unlock", K(ret), K(lock_info), KPC(ls));
|
LOG_WARN("relock before unlock", K(ret), K(lock_info));
|
||||||
}
|
}
|
||||||
#ifdef ERRSIM
|
#ifdef ERRSIM
|
||||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "RELOCK_BEFORE_UNLOCK",
|
SERVER_EVENT_ADD("TRANSFER_LOCK", "RELOCK_BEFORE_UNLOCK",
|
||||||
|
|||||||
@ -29,8 +29,7 @@ public:
|
|||||||
const common::ObArray<share::ObLSID> &lock_ls_list, const common::ObMemberList &member_list,
|
const common::ObArray<share::ObLSID> &lock_ls_list, const common::ObMemberList &member_list,
|
||||||
const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
||||||
static int lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
static int lock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
||||||
const common::ObMemberList &member_list, const ObTransferLockStatus &status, ObLS *ls,
|
const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
||||||
common::ObMySQLProxy &sql_proxy);
|
|
||||||
static int unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
static int unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
||||||
const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
||||||
|
|
||||||
@ -50,9 +49,12 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
/* palf lock config*/
|
/* palf lock config*/
|
||||||
static int try_lock_config_change(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls);
|
static int try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||||
static int record_config_change_lock_stat(const ObTransferTaskLockInfo &lock_info, ObLS *ls);
|
static int try_lock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||||
static int unlock_config_change(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, ObLS *ls);
|
static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked);
|
||||||
|
static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked);
|
||||||
|
static int unlock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||||
|
static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static int check_lock_status_(
|
static int check_lock_status_(
|
||||||
@ -60,7 +62,7 @@ private:
|
|||||||
static int check_unlock_status_(const bool palf_is_locked, const int64_t palf_lock_owner,
|
static int check_unlock_status_(const bool palf_is_locked, const int64_t palf_lock_owner,
|
||||||
const int64_t inner_table_lock_owner, bool &need_unlock, bool &need_relock_before_unlock);
|
const int64_t inner_table_lock_owner, bool &need_unlock, bool &need_relock_before_unlock);
|
||||||
static int relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, const int64_t palf_lock_owner,
|
static int relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, const int64_t palf_lock_owner,
|
||||||
const int64_t lock_timeout, ObLS *ls);
|
const int64_t lock_timeout);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
|
static const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
|
||||||
|
|||||||
@ -1120,6 +1120,51 @@ void ObStorageUnBlockTxArg::reset()
|
|||||||
OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_);
|
OB_SERIALIZE_MEMBER(ObStorageUnBlockTxArg, tenant_id_, ls_id_, gts_);
|
||||||
|
|
||||||
|
|
||||||
|
ObStorageConfigChangeOpArg::ObStorageConfigChangeOpArg()
|
||||||
|
: tenant_id_(OB_INVALID_ID),
|
||||||
|
ls_id_(),
|
||||||
|
type_(MAX),
|
||||||
|
lock_owner_(0),
|
||||||
|
lock_timeout_(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ObStorageConfigChangeOpArg::is_valid() const
|
||||||
|
{
|
||||||
|
return OB_INVALID_ID != tenant_id_
|
||||||
|
&& ls_id_.is_valid()
|
||||||
|
&& type_ >= LOCK_CONFIG_CHANGE
|
||||||
|
&& type_ < MAX;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObStorageConfigChangeOpArg::reset()
|
||||||
|
{
|
||||||
|
tenant_id_ = OB_INVALID_ID;
|
||||||
|
ls_id_.reset();
|
||||||
|
type_ = MAX;
|
||||||
|
lock_owner_ = 0;
|
||||||
|
lock_timeout_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
OB_SERIALIZE_MEMBER(ObStorageConfigChangeOpArg, tenant_id_,
|
||||||
|
ls_id_, type_, lock_owner_, lock_timeout_);
|
||||||
|
|
||||||
|
ObStorageConfigChangeOpRes::ObStorageConfigChangeOpRes()
|
||||||
|
: palf_lock_owner_(0),
|
||||||
|
is_locked_(false),
|
||||||
|
op_succ_(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObStorageConfigChangeOpRes::reset()
|
||||||
|
{
|
||||||
|
palf_lock_owner_ = 0;
|
||||||
|
is_locked_ = false;
|
||||||
|
op_succ_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
OB_SERIALIZE_MEMBER(ObStorageConfigChangeOpRes, palf_lock_owner_, is_locked_, op_succ_);
|
||||||
|
|
||||||
template <ObRpcPacketCode RPC_CODE>
|
template <ObRpcPacketCode RPC_CODE>
|
||||||
ObStorageStreamRpcP<RPC_CODE>::ObStorageStreamRpcP(common::ObInOutBandwidthThrottle *bandwidth_throttle)
|
ObStorageStreamRpcP<RPC_CODE>::ObStorageStreamRpcP(common::ObInOutBandwidthThrottle *bandwidth_throttle)
|
||||||
: bandwidth_throttle_(bandwidth_throttle),
|
: bandwidth_throttle_(bandwidth_throttle),
|
||||||
@ -3050,6 +3095,126 @@ int ObStorageUnBlockTxP::process()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ObStorageLockConfigChangeP::ObStorageLockConfigChangeP(
|
||||||
|
common::ObInOutBandwidthThrottle *bandwidth_throttle)
|
||||||
|
: ObStorageStreamRpcP(bandwidth_throttle)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObStorageLockConfigChangeP::process()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = arg_.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = arg_.ls_id_;
|
||||||
|
const int64_t lock_owner = arg_.lock_owner_;
|
||||||
|
const int64_t lock_timeout = arg_.lock_timeout_;
|
||||||
|
MTL_SWITCH(tenant_id) {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
ObLSService *ls_service = NULL;
|
||||||
|
ObLS *ls = NULL;
|
||||||
|
if (!arg_.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("get invalid args", K(ret), K_(arg));
|
||||||
|
} else if (arg_.type_ != ObStorageConfigChangeOpArg::LOCK_CONFIG_CHANGE) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("type not match", K(ret), K_(arg));
|
||||||
|
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls service should not be null", K(ret), KP(ls_service));
|
||||||
|
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||||
|
LOG_WARN("fail to get log stream", KR(ret), K(arg_));
|
||||||
|
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls));
|
||||||
|
} else if (OB_FAIL(ls->try_lock_config_change(lock_owner, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config config", K(ret), K_(arg));
|
||||||
|
} else {
|
||||||
|
result_.op_succ_ = true;
|
||||||
|
LOG_INFO("lock config change success", K(arg_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObStorageUnlockConfigChangeP::ObStorageUnlockConfigChangeP(
|
||||||
|
common::ObInOutBandwidthThrottle *bandwidth_throttle)
|
||||||
|
: ObStorageStreamRpcP(bandwidth_throttle)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObStorageUnlockConfigChangeP::process()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = arg_.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = arg_.ls_id_;
|
||||||
|
const int64_t lock_owner = arg_.lock_owner_;
|
||||||
|
const int64_t lock_timeout = arg_.lock_timeout_;
|
||||||
|
MTL_SWITCH(tenant_id) {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
ObLSService *ls_service = NULL;
|
||||||
|
ObLS *ls = NULL;
|
||||||
|
if (!arg_.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("get invalid args", K(ret), K_(arg));
|
||||||
|
} else if (arg_.type_ != ObStorageConfigChangeOpArg::UNLOCK_CONFIG_CHANGE) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("type not match", K(ret), K_(arg));
|
||||||
|
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls service should not be null", K(ret), KP(ls_service));
|
||||||
|
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||||
|
LOG_WARN("fail to get log stream", KR(ret), K(arg_));
|
||||||
|
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls));
|
||||||
|
} else if (OB_FAIL(ls->unlock_config_change(lock_owner, lock_timeout))) {
|
||||||
|
LOG_WARN("failed to try lock config config", K(ret), K_(arg));
|
||||||
|
} else {
|
||||||
|
result_.op_succ_ = true;
|
||||||
|
LOG_INFO("unlock config change success", K(arg_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObStorageGetLogConfigStatP::ObStorageGetLogConfigStatP(
|
||||||
|
common::ObInOutBandwidthThrottle *bandwidth_throttle)
|
||||||
|
: ObStorageStreamRpcP(bandwidth_throttle)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObStorageGetLogConfigStatP::process()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = arg_.tenant_id_;
|
||||||
|
const share::ObLSID &ls_id = arg_.ls_id_;
|
||||||
|
MTL_SWITCH(tenant_id) {
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
ObLSService *ls_service = NULL;
|
||||||
|
ObLS *ls = NULL;
|
||||||
|
if (!arg_.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("get invalid args", K(ret), K_(arg));
|
||||||
|
} else if (arg_.type_ != ObStorageConfigChangeOpArg::GET_CONFIG_CHANGE_LOCK_STAT) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("type not match", K(ret), K_(arg));
|
||||||
|
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls service should not be null", K(ret), KP(ls_service));
|
||||||
|
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||||
|
LOG_WARN("fail to get log stream", KR(ret), K(arg_));
|
||||||
|
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("log stream should not be NULL", KR(ret), K(arg_), KP(ls));
|
||||||
|
} else if (OB_FAIL(ls->get_config_change_lock_stat(result_.palf_lock_owner_, result_.is_locked_))) {
|
||||||
|
LOG_WARN("failed to try lock config config", K(ret), K_(arg));
|
||||||
|
} else {
|
||||||
|
result_.op_succ_ = true;
|
||||||
|
LOG_INFO("get config change lock stat success", K(arg_), K(result_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} //namespace obrpc
|
} //namespace obrpc
|
||||||
|
|
||||||
@ -3645,6 +3810,106 @@ int ObStorageRpc::unblock_tx(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObStorageRpc::lock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
|
||||||
|
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
|
||||||
|
} else {
|
||||||
|
ObStorageConfigChangeOpArg arg;
|
||||||
|
ObStorageConfigChangeOpRes res;
|
||||||
|
arg.tenant_id_ = tenant_id;
|
||||||
|
arg.ls_id_ = ls_id;
|
||||||
|
arg.type_ = ObStorageConfigChangeOpArg::LOCK_CONFIG_CHANGE;
|
||||||
|
arg.lock_owner_ = lock_owner;
|
||||||
|
arg.lock_timeout_ = lock_timeout;
|
||||||
|
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
|
||||||
|
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
|
||||||
|
.timeout(timeout)
|
||||||
|
.dst_cluster_id(src_info.cluster_id_)
|
||||||
|
.lock_config_change(arg, res))) {
|
||||||
|
LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObStorageRpc::unlock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
|
||||||
|
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
|
||||||
|
} else {
|
||||||
|
ObStorageConfigChangeOpArg arg;
|
||||||
|
ObStorageConfigChangeOpRes res;
|
||||||
|
arg.tenant_id_ = tenant_id;
|
||||||
|
arg.ls_id_ = ls_id;
|
||||||
|
arg.type_ = ObStorageConfigChangeOpArg::UNLOCK_CONFIG_CHANGE;
|
||||||
|
arg.lock_owner_ = lock_owner;
|
||||||
|
arg.lock_timeout_ = lock_timeout;
|
||||||
|
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
|
||||||
|
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
|
||||||
|
.timeout(timeout)
|
||||||
|
.dst_cluster_id(src_info.cluster_id_)
|
||||||
|
.unlock_config_change(arg, res))) {
|
||||||
|
LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObStorageRpc::get_config_change_lock_stat(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
int64_t &palf_lock_owner,
|
||||||
|
bool &is_locked)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (!is_inited_) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
STORAGE_LOG(WARN, "storage rpc is not inited", K(ret));
|
||||||
|
} else if (tenant_id == OB_INVALID_ID || !src_info.is_valid() || !ls_id.is_valid()) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
STORAGE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(src_info), K(ls_id));
|
||||||
|
} else {
|
||||||
|
ObStorageConfigChangeOpArg arg;
|
||||||
|
ObStorageConfigChangeOpRes res;
|
||||||
|
arg.tenant_id_ = tenant_id;
|
||||||
|
arg.ls_id_ = ls_id;
|
||||||
|
arg.type_ = ObStorageConfigChangeOpArg::GET_CONFIG_CHANGE_LOCK_STAT;
|
||||||
|
const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout;
|
||||||
|
if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_)
|
||||||
|
.by(tenant_id)
|
||||||
|
.timeout(timeout)
|
||||||
|
.dst_cluster_id(src_info.cluster_id_)
|
||||||
|
.get_config_change_lock_stat(arg, res))) {
|
||||||
|
LOG_WARN("failed to replace member", K(ret), K(src_info), K(arg));
|
||||||
|
} else {
|
||||||
|
palf_lock_owner = res.palf_lock_owner_;
|
||||||
|
is_locked = res.is_locked_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // storage
|
} // storage
|
||||||
} // oceanbase
|
} // oceanbase
|
||||||
|
|||||||
@ -712,6 +712,47 @@ public:
|
|||||||
share::SCN gts_;
|
share::SCN gts_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct ObStorageConfigChangeOpArg final
|
||||||
|
{
|
||||||
|
OB_UNIS_VERSION(1);
|
||||||
|
public:
|
||||||
|
enum TYPE
|
||||||
|
{
|
||||||
|
LOCK_CONFIG_CHANGE = 0,
|
||||||
|
UNLOCK_CONFIG_CHANGE = 1,
|
||||||
|
GET_CONFIG_CHANGE_LOCK_STAT = 2,
|
||||||
|
MAX,
|
||||||
|
};
|
||||||
|
public:
|
||||||
|
ObStorageConfigChangeOpArg();
|
||||||
|
~ObStorageConfigChangeOpArg() {}
|
||||||
|
bool is_valid() const;
|
||||||
|
void reset();
|
||||||
|
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(lock_owner), K_(lock_timeout));
|
||||||
|
uint64_t tenant_id_;
|
||||||
|
share::ObLSID ls_id_;
|
||||||
|
TYPE type_;
|
||||||
|
int64_t lock_owner_;
|
||||||
|
int64_t lock_timeout_;
|
||||||
|
private:
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpArg);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ObStorageConfigChangeOpRes final
|
||||||
|
{
|
||||||
|
OB_UNIS_VERSION(1);
|
||||||
|
public:
|
||||||
|
ObStorageConfigChangeOpRes();
|
||||||
|
~ObStorageConfigChangeOpRes() {}
|
||||||
|
void reset();
|
||||||
|
TO_STRING_KV(K_(palf_lock_owner), K_(is_locked), K_(op_succ));
|
||||||
|
int64_t palf_lock_owner_;
|
||||||
|
bool is_locked_;
|
||||||
|
bool op_succ_;
|
||||||
|
private:
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObStorageConfigChangeOpRes);
|
||||||
|
};
|
||||||
|
|
||||||
//src
|
//src
|
||||||
class ObStorageRpcProxy : public obrpc::ObRpcProxy
|
class ObStorageRpcProxy : public obrpc::ObRpcProxy
|
||||||
{
|
{
|
||||||
@ -744,6 +785,9 @@ public:
|
|||||||
RPC_S(PR5 block_tx, OB_HA_BLOCK_TX, (ObStorageBlockTxArg));
|
RPC_S(PR5 block_tx, OB_HA_BLOCK_TX, (ObStorageBlockTxArg));
|
||||||
RPC_S(PR5 kill_tx, OB_HA_KILL_TX, (ObStorageKillTxArg));
|
RPC_S(PR5 kill_tx, OB_HA_KILL_TX, (ObStorageKillTxArg));
|
||||||
RPC_S(PR5 unblock_tx, OB_HA_UNBLOCK_TX, (ObStorageUnBlockTxArg));
|
RPC_S(PR5 unblock_tx, OB_HA_UNBLOCK_TX, (ObStorageUnBlockTxArg));
|
||||||
|
RPC_S(PR5 lock_config_change, OB_HA_LOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
|
||||||
|
RPC_S(PR5 unlock_config_change, OB_HA_UNLOCK_CONFIG_CHANGE, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
|
||||||
|
RPC_S(PR5 get_config_change_lock_stat, OB_HA_GET_CONFIG_CHANGE_LOCK_STAT, (ObStorageConfigChangeOpArg), ObStorageConfigChangeOpRes);
|
||||||
};
|
};
|
||||||
|
|
||||||
template <ObRpcPacketCode RPC_CODE>
|
template <ObRpcPacketCode RPC_CODE>
|
||||||
@ -1035,6 +1079,36 @@ protected:
|
|||||||
int process();
|
int process();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class ObStorageLockConfigChangeP:
|
||||||
|
public ObStorageStreamRpcP<OB_HA_LOCK_CONFIG_CHANGE>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObStorageLockConfigChangeP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
|
||||||
|
virtual ~ObStorageLockConfigChangeP() {}
|
||||||
|
protected:
|
||||||
|
int process();
|
||||||
|
};
|
||||||
|
|
||||||
|
class ObStorageUnlockConfigChangeP:
|
||||||
|
public ObStorageStreamRpcP<OB_HA_UNLOCK_CONFIG_CHANGE>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObStorageUnlockConfigChangeP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
|
||||||
|
virtual ~ObStorageUnlockConfigChangeP() {}
|
||||||
|
protected:
|
||||||
|
int process();
|
||||||
|
};
|
||||||
|
|
||||||
|
class ObStorageGetLogConfigStatP:
|
||||||
|
public ObStorageStreamRpcP<OB_HA_GET_CONFIG_CHANGE_LOCK_STAT>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObStorageGetLogConfigStatP(common::ObInOutBandwidthThrottle *bandwidth_throttle);
|
||||||
|
virtual ~ObStorageGetLogConfigStatP() {}
|
||||||
|
protected:
|
||||||
|
int process();
|
||||||
|
};
|
||||||
|
|
||||||
} // obrpc
|
} // obrpc
|
||||||
|
|
||||||
|
|
||||||
@ -1164,6 +1238,24 @@ public:
|
|||||||
const ObStorageHASrcInfo &src_info,
|
const ObStorageHASrcInfo &src_info,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
const share::SCN >s) = 0;
|
const share::SCN >s) = 0;
|
||||||
|
virtual int lock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout) = 0;
|
||||||
|
virtual int unlock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout) = 0;
|
||||||
|
virtual int get_config_change_lock_stat(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
int64_t &palf_lock_owner,
|
||||||
|
bool &is_locked) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObStorageRpc: public ObIStorageRpc
|
class ObStorageRpc: public ObIStorageRpc
|
||||||
@ -1286,6 +1378,24 @@ public:
|
|||||||
const ObStorageHASrcInfo &src_info,
|
const ObStorageHASrcInfo &src_info,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
const share::SCN >s);
|
const share::SCN >s);
|
||||||
|
virtual int lock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout);
|
||||||
|
virtual int unlock_config_change(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
const int64_t lock_owner,
|
||||||
|
const int64_t lock_timeout);
|
||||||
|
virtual int get_config_change_lock_stat(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObStorageHASrcInfo &src_info,
|
||||||
|
const share::ObLSID &ls_id,
|
||||||
|
int64_t &palf_lock_owner,
|
||||||
|
bool &is_locked);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
|
|||||||
Reference in New Issue
Block a user