fix primary switchover to standby do not clear member list lock issue
This commit is contained in:
@ -29,6 +29,7 @@
|
||||
#include "observer/ob_inner_sql_connection.h"//ObInnerSQLConnection
|
||||
#include "storage/tx/ob_trans_service.h" //ObTransService
|
||||
#include "storage/tx/ob_timestamp_service.h" // ObTimestampService
|
||||
#include "storage/high_availability/ob_transfer_lock_utils.h" // ObMemberListLockUtils
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -37,6 +38,7 @@ using namespace common;
|
||||
using namespace obrpc;
|
||||
using namespace share;
|
||||
using namespace rootserver;
|
||||
using namespace storage;
|
||||
|
||||
namespace standby
|
||||
{
|
||||
@ -484,6 +486,8 @@ int ObPrimaryStandbyService::switch_to_standby(
|
||||
(void)role_transition_service.set_switchover_epoch(tenant_info.get_switchover_epoch());
|
||||
if (OB_FAIL(role_transition_service.do_switch_access_mode_to_raw_rw(tenant_info))) {
|
||||
LOG_WARN("failed to do_switch_access_mode", KR(ret), K(tenant_id), K(tenant_info));
|
||||
} else if (OB_FAIL(ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(tenant_id, *sql_proxy_))) {
|
||||
LOG_WARN("failed to unlock member list when switch to standby", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(role_transition_service.switchover_update_tenant_status(tenant_id,
|
||||
false /* switch_to_standby */,
|
||||
share::STANDBY_TENANT_ROLE,
|
||||
|
||||
@ -142,6 +142,36 @@ int ObTransferLockInfoOperator::get(const ObTransferLockInfoRowKey &row_key, con
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferLockInfoOperator::fetch_all(common::ObISQLClient &sql_proxy, const uint64_t tenant_id,
|
||||
common::ObArray<ObTransferTaskLockInfo> &lock_infos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObISQLClient::ReadResult, result)
|
||||
{
|
||||
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE tenant_id = %lu",
|
||||
OB_ALL_LS_TRANSFER_MEMBER_LIST_LOCK_INFO_TNAME,
|
||||
tenant_id))) {
|
||||
LOG_WARN("fail to assign sql", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_proxy.read(result, gen_meta_tenant_id(tenant_id), sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get mysql result failed", K(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(parse_sql_results_(*result.get_result(), lock_infos))) {
|
||||
LOG_WARN("construct transfer task failed", K(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
LOG_INFO("get lock info success", K(tenant_id), K(lock_infos), K(sql));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferLockInfoOperator::fill_dml_splicer_(const ObTransferTaskLockInfo &lock_info, ObDMLSqlSplicer &dml_splicer)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -221,5 +251,28 @@ int ObTransferLockInfoOperator::parse_sql_result_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferLockInfoOperator::parse_sql_results_(
|
||||
common::sqlclient::ObMySQLResult &result, common::ObArray<ObTransferTaskLockInfo> &lock_infos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
lock_infos.reset();
|
||||
while (OB_SUCC(ret)) {
|
||||
ObTransferTaskLockInfo lock_info;
|
||||
if (OB_FAIL(result.next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(parse_sql_result_(result, lock_info))) {
|
||||
LOG_WARN("failed to parse sql result", K(ret));
|
||||
} else if (OB_FAIL(lock_infos.push_back(lock_info))) {
|
||||
LOG_WARN("failed to push back lock info", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
@ -33,11 +33,14 @@ public:
|
||||
const ObTransferLockStatus &status, common::ObISQLClient &sql_proxy);
|
||||
static int get(const ObTransferLockInfoRowKey &row_key, const int64_t task_id, const ObTransferLockStatus &status,
|
||||
const bool for_update, ObTransferTaskLockInfo &lock_info, common::ObISQLClient &sql_proxy);
|
||||
static int fetch_all(common::ObISQLClient &sql_proxy, const uint64_t tenant_id,
|
||||
common::ObArray<ObTransferTaskLockInfo> &lock_infos);
|
||||
|
||||
private:
|
||||
static int fill_dml_splicer_(const ObTransferTaskLockInfo &lock_info, share::ObDMLSqlSplicer &dml_splicer);
|
||||
static int construct_result_(common::sqlclient::ObMySQLResult &res, ObTransferTaskLockInfo &lock_info);
|
||||
static int parse_sql_result_(common::sqlclient::ObMySQLResult &res, ObTransferTaskLockInfo &lock_info);
|
||||
static int parse_sql_results_(common::sqlclient::ObMySQLResult &res, common::ObArray<ObTransferTaskLockInfo> &lock_infos);
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
||||
@ -21,6 +21,9 @@
|
||||
#include "share/ob_common_id.h"
|
||||
#include "observer/ob_server_event_history_table_operator.h"
|
||||
#include "storage/high_availability/ob_storage_ha_utils.h"
|
||||
#include "storage/ob_storage_rpc.h"
|
||||
#include "observer/ob_srv_network_frame.h"
|
||||
#include "share/ob_tenant_info_proxy.h"
|
||||
|
||||
using namespace oceanbase::share;
|
||||
using namespace oceanbase::common;
|
||||
@ -225,6 +228,35 @@ int ObMemberListLockUtils::unlock_ls_member_list(const uint64_t tenant_id, const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMemberListLockUtils::unlock_member_list_when_switch_to_standby(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObTransferTaskLockInfo> lock_infos;
|
||||
if (OB_FAIL(ObTransferLockInfoOperator::fetch_all(sql_proxy, tenant_id, lock_infos))) {
|
||||
LOG_WARN("failed to fetch all lock info", K(ret), K(tenant_id));
|
||||
} else if (lock_infos.empty()) {
|
||||
LOG_INFO("no need unlock member list when switch to standby", K(tenant_id));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < lock_infos.count(); ++i) {
|
||||
const ObTransferTaskLockInfo &lock_info = lock_infos.at(i);
|
||||
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||
const int64_t task_id = lock_info.task_id_;
|
||||
const ObTransferLockStatus status = lock_info.status_;
|
||||
ObMemberList fake_member_list;
|
||||
if (OB_FAIL(unlock_ls_member_list(tenant_id,
|
||||
ls_id,
|
||||
task_id,
|
||||
fake_member_list,
|
||||
status,
|
||||
sql_proxy))) {
|
||||
LOG_WARN("failed to unlock ls member list", K(ret), K(lock_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMemberListLockUtils::try_lock_config_change_(
|
||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||
{
|
||||
@ -234,27 +266,16 @@ int ObMemberListLockUtils::try_lock_config_change_(
|
||||
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||
const int64_t lock_owner = lock_info.lock_owner_;
|
||||
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||
} else if (!ls_exist) {
|
||||
if (OB_FAIL(try_lock_config_change_fallback_(lock_info, lock_timeout))) {
|
||||
obrpc::ObStorageRpcProxy storage_svr_rpc_proxy;
|
||||
storage::ObStorageRpc storage_rpc;
|
||||
if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) {
|
||||
LOG_WARN("failed to init storage rpc", K(ret));
|
||||
} else if (OB_FAIL(try_lock_config_change_fallback_(lock_info, lock_timeout, storage_rpc))) {
|
||||
LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("try lock config change fallback", K(lock_info), K(lock_timeout));
|
||||
}
|
||||
} else {
|
||||
ObLSHandle ls_handle;
|
||||
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->try_lock_config_change(lock_owner, lock_timeout))) {
|
||||
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("try lock config change", K(lock_info), K(lock_timeout));
|
||||
}
|
||||
}
|
||||
destory_storage_rpc_(storage_svr_rpc_proxy, storage_rpc);
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "LOCK_CONFIG_CHANGE",
|
||||
"tenant_id", lock_info.tenant_id_,
|
||||
@ -268,25 +289,18 @@ int ObMemberListLockUtils::try_lock_config_change_(
|
||||
}
|
||||
|
||||
int ObMemberListLockUtils::try_lock_config_change_fallback_(
|
||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout,
|
||||
storage::ObStorageRpc &storage_rpc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_svr = NULL;
|
||||
ObStorageRpc *storage_rpc = NULL;
|
||||
ObStorageHASrcInfo src_info;
|
||||
src_info.cluster_id_ = GCONF.cluster_id;
|
||||
if (!lock_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(storage_rpc->lock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||
} else if (OB_FAIL(storage_rpc.lock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||
lock_info.lock_owner_, lock_timeout))) {
|
||||
LOG_WARN("failed to try lock config config", K(ret), K(lock_info));
|
||||
}
|
||||
@ -301,27 +315,16 @@ int ObMemberListLockUtils::get_config_change_lock_stat_(
|
||||
ObLSService *ls_svr = NULL;
|
||||
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||
} else if (!ls_exist) {
|
||||
if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, palf_lock_owner, is_locked))) {
|
||||
obrpc::ObStorageRpcProxy storage_svr_rpc_proxy;
|
||||
storage::ObStorageRpc storage_rpc;
|
||||
if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) {
|
||||
LOG_WARN("failed to init storage rpc", K(ret));
|
||||
} else if (OB_FAIL(get_config_change_lock_stat_fallback_(lock_info, palf_lock_owner, is_locked, storage_rpc))) {
|
||||
LOG_WARN("failed to get lock config change fallback", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("get lock config change stat fallback", K(lock_info), K(palf_lock_owner), K(is_locked));
|
||||
}
|
||||
} else {
|
||||
ObLSHandle ls_handle;
|
||||
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->get_config_change_lock_stat(palf_lock_owner, is_locked))) {
|
||||
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("get lock config change stat", K(lock_info), K(palf_lock_owner), K(is_locked));
|
||||
}
|
||||
}
|
||||
destory_storage_rpc_(storage_svr_rpc_proxy, storage_rpc);
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "GET_CONFIG_CHANGE_LOCK_STAT",
|
||||
"tenant_id", lock_info.tenant_id_,
|
||||
@ -335,27 +338,20 @@ int ObMemberListLockUtils::get_config_change_lock_stat_(
|
||||
}
|
||||
|
||||
int ObMemberListLockUtils::get_config_change_lock_stat_fallback_(
|
||||
const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked)
|
||||
const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner,
|
||||
bool &is_locked, storage::ObStorageRpc &storage_rpc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
palf_lock_owner = -1;
|
||||
is_locked = false;
|
||||
ObLSService *ls_svr = NULL;
|
||||
ObStorageRpc *storage_rpc = NULL;
|
||||
ObStorageHASrcInfo src_info;
|
||||
src_info.cluster_id_ = GCONF.cluster_id;
|
||||
if (!lock_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(storage_rpc->get_config_change_lock_stat(lock_info.tenant_id_, src_info,
|
||||
} else if (OB_FAIL(storage_rpc.get_config_change_lock_stat(lock_info.tenant_id_, src_info,
|
||||
lock_info.ls_id_, palf_lock_owner, is_locked))) {
|
||||
LOG_WARN("failed to get config change lock stat", K(ret), K(lock_info));
|
||||
}
|
||||
@ -371,27 +367,16 @@ int ObMemberListLockUtils::unlock_config_change_(
|
||||
const uint64_t tenant_id = lock_info.tenant_id_;
|
||||
const share::ObLSID &ls_id = lock_info.ls_id_;
|
||||
const int64_t lock_owner = lock_info.lock_owner_;
|
||||
if (OB_ISNULL(ls_svr = MTL(ObLSService*))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant storage ptr is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(ls_svr->check_ls_exist(ls_id, ls_exist))) {
|
||||
LOG_WARN("fail to check log stream exist", KR(ret), K(ls_id));
|
||||
} else if (!ls_exist) {
|
||||
if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout))) {
|
||||
obrpc::ObStorageRpcProxy storage_svr_rpc_proxy;
|
||||
storage::ObStorageRpc storage_rpc;
|
||||
if (OB_FAIL(init_storage_rpc_(storage_svr_rpc_proxy, storage_rpc))) {
|
||||
LOG_WARN("failed to init storage rpc", K(ret));
|
||||
} else if (OB_FAIL(unlock_config_change_fallback_(lock_info, lock_timeout, storage_rpc))) {
|
||||
LOG_WARN("failed to try lock config change fallback", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("unlock lock config change fallback", K(lock_info), K(lock_timeout));
|
||||
}
|
||||
} else {
|
||||
ObLSHandle ls_handle;
|
||||
if (OB_FAIL(get_ls_handle(tenant_id, ls_id, ls_handle))) {
|
||||
LOG_WARN("failed to get ls handle", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->unlock_config_change(lock_owner, lock_timeout))) {
|
||||
LOG_WARN("failed to try lock config change", K(ret), K(lock_info));
|
||||
} else {
|
||||
LOG_INFO("unlock lock config change", K(lock_info), K(lock_timeout));
|
||||
}
|
||||
}
|
||||
destory_storage_rpc_(storage_svr_rpc_proxy, storage_rpc);
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_ADD("TRANSFER_LOCK", "UNLOCK_CONFIG_CHANGE",
|
||||
"tenant_id", lock_info.tenant_id_,
|
||||
@ -405,25 +390,17 @@ int ObMemberListLockUtils::unlock_config_change_(
|
||||
}
|
||||
|
||||
int ObMemberListLockUtils::unlock_config_change_fallback_(
|
||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout)
|
||||
const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout, storage::ObStorageRpc &storage_rpc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_svr = NULL;
|
||||
ObStorageRpc *storage_rpc = NULL;
|
||||
ObStorageHASrcInfo src_info;
|
||||
src_info.cluster_id_ = GCONF.cluster_id;
|
||||
if (!lock_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", K(ret), K(lock_info));
|
||||
} else if (OB_ISNULL(ls_svr = (MTL(ObLSService *)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls service should not be NULL", K(ret), KP(ls_svr));
|
||||
} else if (OB_ISNULL(storage_rpc = ls_svr->get_storage_rpc())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("storage rpc should not be NULL", K(ret), KP(storage_rpc));
|
||||
} else if (OB_FAIL(ObStorageHAUtils::get_ls_leader(lock_info.tenant_id_, lock_info.ls_id_, src_info.src_addr_))) {
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(lock_info));
|
||||
} else if (OB_FAIL(storage_rpc->unlock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||
} else if (OB_FAIL(storage_rpc.unlock_config_change(lock_info.tenant_id_, src_info, lock_info.ls_id_,
|
||||
lock_info.lock_owner_, lock_timeout))) {
|
||||
LOG_WARN("failed to try lock config config", K(ret), K(lock_info));
|
||||
}
|
||||
@ -480,15 +457,27 @@ int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const shar
|
||||
int64_t &real_lock_owner, common::ObISQLClient &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
real_lock_owner = -1;
|
||||
ObMySQLTransaction trans;
|
||||
ObTransferTaskLockInfo lock_info;
|
||||
ObAllTenantInfo tenant_info;
|
||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
if (OB_FAIL(lock_info.set(tenant_id, ls_id, task_id, status, lock_owner, comment))) {
|
||||
LOG_WARN("failed to set lock info", K(ret), K(tenant_id), K(ls_id), K(status), K(real_lock_owner));
|
||||
} else if (OB_FAIL(ObTransferLockInfoOperator::insert(lock_info, sql_proxy))) {
|
||||
} else if (OB_FAIL(trans.start(&sql_proxy, meta_tenant_id))) {
|
||||
LOG_WARN("failed to start trans", K(ret), K(meta_tenant_id));
|
||||
} else {
|
||||
if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, &trans, true/*for_update*/, tenant_info))) {
|
||||
LOG_WARN("failed to load tenant info", K(ret), K(tenant_id));
|
||||
} else if (!tenant_info.is_primary()) {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
LOG_WARN("tenant is not primary, do not allow insert lock info", K(tenant_id), K(tenant_info));
|
||||
} else if (OB_FAIL(ObTransferLockInfoOperator::insert(lock_info, trans))) {
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ObTransferTaskLockInfo tmp_lock_info;
|
||||
if (OB_FAIL(get_lock_info(tenant_id, ls_id, task_id, status, tmp_lock_info, sql_proxy))) {
|
||||
if (OB_FAIL(get_lock_info(tenant_id, ls_id, task_id, status, tmp_lock_info, trans))) {
|
||||
LOG_WARN("failed to get lock info", K(ret), K(ls_id));
|
||||
} else {
|
||||
real_lock_owner = tmp_lock_info.lock_owner_;
|
||||
@ -500,6 +489,10 @@ int ObMemberListLockUtils::insert_lock_info(const uint64_t tenant_id, const shar
|
||||
} else {
|
||||
real_lock_owner = lock_owner;
|
||||
}
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN("failed to end trans", K(tmp_ret), K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -615,5 +608,27 @@ int ObMemberListLockUtils::relock_before_unlock_(const ObTransferTaskLockInfo &l
|
||||
return ret;
|
||||
}
|
||||
|
||||
// TODO(yangyi.yyy): change the use of storage rpc later
|
||||
int ObMemberListLockUtils::init_storage_rpc_(
|
||||
obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy,
|
||||
storage::ObStorageRpc &storage_rpc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(storage_svr_rpc_proxy.init(GCTX.net_frame_->get_req_transport(), GCTX.self_addr()))) {
|
||||
LOG_WARN("failed to init storage svr rpc proxy", K(ret));
|
||||
} else if (OB_FAIL(storage_rpc.init(&storage_svr_rpc_proxy, GCTX.self_addr(), GCTX.rs_rpc_proxy_))) {
|
||||
STORAGE_LOG(WARN, "fail to init partition service rpc", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObMemberListLockUtils::destory_storage_rpc_(
|
||||
obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy,
|
||||
storage::ObStorageRpc &storage_rpc)
|
||||
{
|
||||
storage_svr_rpc_proxy.destroy();
|
||||
storage_rpc.destroy();
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -33,6 +33,10 @@ public:
|
||||
static int unlock_ls_member_list(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
||||
const common::ObMemberList &member_list, const ObTransferLockStatus &status, common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
public:
|
||||
/* interface used for primary switch over to standby */
|
||||
static int unlock_member_list_when_switch_to_standby(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
private:
|
||||
/* sql operator */
|
||||
static int insert_lock_info(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
||||
@ -50,11 +54,15 @@ private:
|
||||
private:
|
||||
/* palf lock config*/
|
||||
static int try_lock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||
static int try_lock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||
static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked);
|
||||
static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info, int64_t &palf_lock_owner, bool &is_locked);
|
||||
static int try_lock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout,
|
||||
storage::ObStorageRpc &storage_rpc);
|
||||
static int get_config_change_lock_stat_(const ObTransferTaskLockInfo &lock_info,
|
||||
int64_t &palf_lock_owner, bool &is_locked);
|
||||
static int get_config_change_lock_stat_fallback_(const ObTransferTaskLockInfo &lock_info,
|
||||
int64_t &palf_lock_owner, bool &is_locked, storage::ObStorageRpc &storage_rpc);
|
||||
static int unlock_config_change_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||
static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout);
|
||||
static int unlock_config_change_fallback_(const ObTransferTaskLockInfo &lock_info, const int64_t lock_timeout,
|
||||
storage::ObStorageRpc &storage_rpc);
|
||||
|
||||
private:
|
||||
static int check_lock_status_(
|
||||
@ -63,6 +71,8 @@ private:
|
||||
const int64_t inner_table_lock_owner, bool &need_unlock, bool &need_relock_before_unlock);
|
||||
static int relock_before_unlock_(const ObTransferTaskLockInfo &lock_info, const int64_t palf_lock_owner,
|
||||
const int64_t lock_timeout);
|
||||
static int init_storage_rpc_(obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy, storage::ObStorageRpc &storage_rpc);
|
||||
static void destory_storage_rpc_(obrpc::ObStorageRpcProxy &storage_svr_rpc_proxy, storage::ObStorageRpc &storage_rpc);
|
||||
|
||||
private:
|
||||
static const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
|
||||
|
||||
@ -537,6 +537,23 @@ bool ObTransferTaskLockInfo::is_valid() const
|
||||
return OB_INVALID_ID != tenant_id_ && ls_id_.is_valid() && task_id_ >= 0 && status_.is_valid()
|
||||
&& lock_owner_ > 0;
|
||||
}
|
||||
|
||||
int ObTransferTaskLockInfo::assign(const ObTransferTaskLockInfo &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!other.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(other));
|
||||
} else {
|
||||
tenant_id_ = other.tenant_id_;
|
||||
ls_id_ = other.ls_id_;
|
||||
task_id_ = other.task_id_;
|
||||
status_ = other.status_;
|
||||
lock_owner_ = other.lock_owner_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransferTaskLockInfo::set(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id,
|
||||
const ObTransferLockStatus &status, const int64_t lock_owner, const common::ObString &comment)
|
||||
{
|
||||
|
||||
@ -204,6 +204,7 @@ public:
|
||||
~ObTransferTaskLockInfo() = default;
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
int assign(const ObTransferTaskLockInfo &other);
|
||||
int set(const uint64_t tenant_id, const share::ObLSID &ls_id, const int64_t task_id, const ObTransferLockStatus &status,
|
||||
const int64_t lock_owner, const common::ObString &comment);
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(task_id), K_(status), K_(lock_owner), K_(comment));
|
||||
|
||||
Reference in New Issue
Block a user