Observer adapted rs wake-up transfer task retry mechanism
This commit is contained in:
parent
b414155cd5
commit
8309758e9a
@ -73,6 +73,7 @@
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "sql/session/ob_sess_info_verify.h"
|
||||
#include "observer/table/ttl/ob_ttl_service.h"
|
||||
#include "storage/high_availability/ob_storage_ha_utils.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -2580,8 +2581,10 @@ int ObRpcStartTransferTaskP::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTransferService *transfer_service = nullptr;
|
||||
|
||||
if (OB_UNLIKELY(arg_.get_tenant_id() != MTL_ID())) {
|
||||
const share::ObLSID &src_ls = arg_.get_src_ls();
|
||||
const uint64_t tenant_id = arg_.get_tenant_id();
|
||||
bool is_leader = false;
|
||||
if (OB_UNLIKELY(tenant_id != MTL_ID())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObRpcStartTransferTaskP::process tenant not match", KR(ret), K_(arg));
|
||||
} else if (OB_UNLIKELY(!arg_.is_valid())) {
|
||||
@ -2590,6 +2593,11 @@ int ObRpcStartTransferTaskP::process()
|
||||
} else if (OB_ISNULL(transfer_service = MTL(ObTransferService *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls service should not be null", K(ret), KP(transfer_service));
|
||||
} else if (OB_FAIL(storage::ObStorageHAUtils::check_ls_is_leader(tenant_id, src_ls, is_leader))) {
|
||||
LOG_WARN("fail to check ls is leader", K(ret), K(tenant_id), K(src_ls));
|
||||
} else if (!is_leader) {
|
||||
ret = OB_NOT_MASTER;
|
||||
LOG_WARN("ls is not leader, please retry", K(ret), K(is_leader));
|
||||
} else {
|
||||
transfer_service->wakeup();
|
||||
}
|
||||
|
@ -9686,23 +9686,23 @@ int ObRlsContextDDLArg::assign(const ObRlsContextDDLArg &other)
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObStartTransferTaskArg, tenant_id_, task_id_, dest_ls_);
|
||||
OB_SERIALIZE_MEMBER(ObStartTransferTaskArg, tenant_id_, task_id_, src_ls_);
|
||||
|
||||
int ObStartTransferTaskArg::init(
|
||||
const uint64_t tenant_id,
|
||||
const ObTransferTaskID &task_id,
|
||||
const ObLSID &dest_ls)
|
||||
const ObLSID &src_ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|
||||
|| ! task_id.is_valid())
|
||||
|| !dest_ls.is_valid()) {
|
||||
|| !src_ls.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(task_id), K(dest_ls));
|
||||
LOG_WARN("invalid args", KR(ret), K(tenant_id), K(task_id), K(src_ls));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
task_id_ = task_id;
|
||||
dest_ls_ = dest_ls;
|
||||
src_ls_ = src_ls;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -9712,7 +9712,7 @@ int ObStartTransferTaskArg::assign(const ObStartTransferTaskArg &other)
|
||||
int ret = OB_SUCCESS;
|
||||
tenant_id_ = other.tenant_id_;
|
||||
task_id_ = other.task_id_;
|
||||
dest_ls_ = other.dest_ls_;
|
||||
src_ls_ = other.src_ls_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -9708,20 +9708,20 @@ struct ObStartTransferTaskArg final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
public:
|
||||
ObStartTransferTaskArg(): tenant_id_(OB_INVALID_TENANT_ID), task_id_(), dest_ls_() {}
|
||||
ObStartTransferTaskArg(): tenant_id_(OB_INVALID_TENANT_ID), task_id_(), src_ls_() {}
|
||||
~ObStartTransferTaskArg() {}
|
||||
int init(const uint64_t tenant_id, const share::ObTransferTaskID &task_id, const share::ObLSID &dest_ls);
|
||||
int init(const uint64_t tenant_id, const share::ObTransferTaskID &task_id, const share::ObLSID &src_ls);
|
||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||
const share::ObTransferTaskID get_task_id() const { return task_id_; }
|
||||
const share::ObLSID &get_dest_ls() { return dest_ls_; }
|
||||
const share::ObLSID &get_src_ls() { return src_ls_; }
|
||||
bool is_valid() const { return is_valid_tenant_id(tenant_id_) && task_id_.is_valid(); }
|
||||
int assign(const ObStartTransferTaskArg &other);
|
||||
TO_STRING_KV(K_(tenant_id), K_(task_id), K_(dest_ls));
|
||||
TO_STRING_KV(K_(tenant_id), K_(task_id), K_(src_ls));
|
||||
|
||||
private:
|
||||
uint64_t tenant_id_;
|
||||
share::ObTransferTaskID task_id_;
|
||||
share::ObLSID dest_ls_;
|
||||
share::ObLSID src_ls_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObStartTransferTaskArg);
|
||||
};
|
||||
|
@ -961,23 +961,10 @@ int ObTxFinishTransfer::report_result_(
|
||||
int ObTxFinishTransfer::check_self_ls_leader_(const share::ObLSID &ls_id, bool &is_leader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRole role = ObRole::INVALID_ROLE;
|
||||
int64_t proposal_id = 0;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
is_leader = false;
|
||||
if (OB_FAIL(get_ls_handle_(tenant_id, ls_id, ls_handle))) {
|
||||
LOG_WARN("failed to get ls handle", K(ret), K(tenant_id), K(ls_id));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), K(tenant_id), K(ls_id));
|
||||
} else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) {
|
||||
LOG_WARN("failed to get role", K(ret), K(tenant_id), K(ls_id));
|
||||
} else if (is_strong_leader(role)) {
|
||||
is_leader = true;
|
||||
} else {
|
||||
is_leader = false;
|
||||
if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader(tenant_id, ls_id, is_leader))) {
|
||||
LOG_WARN("failed to check ls leader", K(ret), K(tenant_id), K(ls_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "share/ls/ob_ls_table_operator.h"
|
||||
#include "ob_rebuild_service.h"
|
||||
#include "share/ob_cluster_version.h"
|
||||
#include "ob_storage_ha_utils.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1042,9 +1043,8 @@ int ObStartMigrationTask::deal_with_local_ls_()
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
ObRole role;
|
||||
int64_t proposal_id = 0;
|
||||
ObLSMeta local_ls_meta;
|
||||
bool is_leader = false;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("start migration task do not init", K(ret));
|
||||
@ -1053,18 +1053,19 @@ int ObStartMigrationTask::deal_with_local_ls_()
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("log stream should not be NULL", K(ret), K(*ctx_));
|
||||
} else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) {
|
||||
LOG_WARN("failed to get role", K(ret), "arg", ctx_->arg_);
|
||||
} else if (is_strong_leader(role)) {
|
||||
} else if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader(
|
||||
ctx_->tenant_id_, ctx_->arg_.ls_id_, is_leader))) {
|
||||
LOG_WARN("failed to check ls leader", K(ret), KPC(ctx_));
|
||||
} else if (is_leader) {
|
||||
if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("leader can not as rebuild dst", K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
LOG_ERROR("leader can not as rebuild dst", K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
} else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_
|
||||
|| ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_
|
||||
|| ObMigrationOpType::CHANGE_LS_OP == ctx_->arg_.type_) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("leader cannot as add, migrate, change dst",
|
||||
K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
}
|
||||
} else if (OB_FAIL(ls->offline())) {
|
||||
LOG_WARN("failed to disable log", K(ret), KPC(ctx_));
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "ob_transfer_service.h"
|
||||
#include "storage/tablet/ob_tablet.h"
|
||||
#include "ob_rebuild_service.h"
|
||||
#include "ob_storage_ha_utils.h"
|
||||
|
||||
using namespace oceanbase;
|
||||
using namespace common;
|
||||
@ -756,8 +757,7 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_()
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
ObRole role;
|
||||
int64_t proposal_id = 0;
|
||||
bool is_leader = false;
|
||||
ObLSSavedInfo saved_info;
|
||||
|
||||
if (!is_inited_) {
|
||||
@ -768,18 +768,19 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_()
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("log stream should not be NULL", K(ret), K(*ctx_));
|
||||
} else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) {
|
||||
LOG_WARN("failed to get role", K(ret), "arg", ctx_->arg_);
|
||||
} else if (is_strong_leader(role)) {
|
||||
} else if (OB_FAIL(ObStorageHAUtils::check_ls_is_leader(
|
||||
ctx_->tenant_id_, ctx_->arg_.ls_id_, is_leader))) {
|
||||
LOG_WARN("failed to check ls leader", K(ret), KPC(ctx_));
|
||||
} else if (is_leader) {
|
||||
if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("leader can not as rebuild dst", K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
LOG_ERROR("leader can not as rebuild dst", K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
} else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_
|
||||
|| ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_
|
||||
|| ObMigrationOpType::CHANGE_LS_OP == ctx_->arg_.type_) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("leader cannot as add, migrate, change dst",
|
||||
K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
K(ret), K(is_leader), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "rootserver/ob_tenant_info_loader.h"
|
||||
#include "src/observer/omt/ob_tenant_config.h"
|
||||
#include "common/errsim_module/ob_errsim_module_type.h"
|
||||
#include "common/ob_role.h"
|
||||
|
||||
using namespace oceanbase::share;
|
||||
|
||||
@ -382,6 +383,38 @@ int ObStorageHAUtils::calc_tablet_sstable_macro_block_cnt(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageHAUtils::check_ls_is_leader(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
bool &is_leader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_srv = NULL;
|
||||
common::ObRole role = common::ObRole::INVALID_ROLE;
|
||||
int64_t proposal_id = 0;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
is_leader = false;
|
||||
if (OB_INVALID_ID == tenant_id || !ls_id.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id));
|
||||
} else if (OB_ISNULL(ls_srv = MTL_WITH_CHECK_TENANT(ObLSService *, tenant_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("log stream service is NULL", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(ls_srv->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(tenant_id), K(ls_id));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
LOG_WARN("ls should not be null", K(ret), KP(ls));
|
||||
} else if (OB_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) {
|
||||
LOG_WARN("failed to get role", K(ret), KP(ls));
|
||||
} else if (is_strong_leader(role)) {
|
||||
is_leader = true;
|
||||
} else {
|
||||
is_leader = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTransferUtils::is_need_retry_error(const int err)
|
||||
{
|
||||
bool bool_ret = false;
|
||||
|
@ -46,6 +46,7 @@ public:
|
||||
static int64_t get_rpc_timeout();
|
||||
static int check_is_primary_tenant(const uint64_t tenant_id, bool &is_primary_tenant);
|
||||
static int check_disk_space();
|
||||
static int check_ls_is_leader(const uint64_t tenant_id, const share::ObLSID &ls_id, bool &is_leader);
|
||||
|
||||
static int calc_tablet_sstable_macro_block_cnt(
|
||||
const ObTabletHandle &tablet_handle, int64_t &data_macro_block_count);
|
||||
|
Loading…
x
Reference in New Issue
Block a user