[CP] fix bug of get ls leader addr

This commit is contained in:
obdev 2024-06-17 12:19:42 +00:00 committed by ob-robot
parent 461a59b5a9
commit f8c83cc195
5 changed files with 127 additions and 61 deletions

View File

@ -376,22 +376,16 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
}
}
if (OB_SUCC(ret)) {
while (true) {
ret = OB_SUCCESS;
if (THIS_WORKER.is_timeout()) {
ret = OB_TIMEOUT;
LOG_WARN("gen_apply_arg wait too long", KR(ret));
break;
} else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) {
LOG_WARN("fail to check status", KR(ret));
break;
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(GCONF.cluster_id,
apply_arg.tenant_id_,
share::SYS_LS,
leader))) {
LOG_WARN("fail to get ls location leader", KR(ret), K(apply_arg.tenant_id_));
} else if (ctx_->schema_.is_heap_table_ || !ctx_->param_.need_sort_) {
while (OB_SUCC(ret)) {
if (THIS_WORKER.is_timeout()) {
ret = OB_TIMEOUT;
LOG_WARN("gen_apply_arg wait too long", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) {
LOG_WARN("fail to check status", KR(ret));
} else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) {
LOG_WARN("fail to check status", KR(ret));
} else {
if (ctx_->schema_.is_heap_table_ || !ctx_->param_.need_sort_) {
last_sort = false;
for (int64_t i = 0; !last_sort && i < store_server_count; i++) {
if (min_unsort_memory[i] > memory_limit) {
@ -406,37 +400,33 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
: min_unsort_memory[i]);
}
}
if (OB_SUCC(ret)) {
if (ObTableLoadUtils::is_local_addr(leader)) {
ret = ObTableLoadResourceService::apply_resource(apply_arg, apply_res);
} else {
TABLE_LOAD_RESOURCE_RPC_CALL(apply_resource, leader, apply_arg, apply_res);
if (OB_FAIL(ObTableLoadResourceService::apply_resource(apply_arg, apply_res))) {
if (retry_count % 100 == 0) {
LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count));
}
if (OB_SUCC(ret) && OB_SUCC(apply_res.error_code_)) {
ctx_->param_.need_sort_ = last_sort;
ctx_->param_.session_count_ = coordinator_session_count;
ctx_->param_.write_session_count_ = (include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2)
: min_session_count);
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? (last_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT
: ObTableLoadExeMode::FAST_HEAP_TABLE)
: (last_sort ? ObTableLoadExeMode::MEM_COMPACT
: ObTableLoadExeMode::GENERAL_TABLE_COMPACT));
if (OB_FAIL(ObTableLoadService::add_assigned_task(apply_arg))) {
LOG_WARN("fail to add_assigned_task", KR(ret));
} else {
ctx_->set_assigned_resource();
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(leader), K(coordinator_addr), K(apply_arg));
break;
}
} else {
if (ret == OB_EAGAIN) {
retry_count++;
if (retry_count % 100 == 0) {
LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count));
}
ret = OB_SUCCESS;
}
} else {
ctx_->param_.need_sort_ = last_sort;
ctx_->param_.session_count_ = coordinator_session_count;
ctx_->param_.write_session_count_ = (include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2)
: min_session_count);
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? (last_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT
: ObTableLoadExeMode::FAST_HEAP_TABLE)
: (last_sort ? ObTableLoadExeMode::MEM_COMPACT
: ObTableLoadExeMode::GENERAL_TABLE_COMPACT));
if (OB_FAIL(ObTableLoadService::add_assigned_task(apply_arg))) {
LOG_WARN("fail to add_assigned_task", KR(ret));
} else {
ctx_->set_assigned_resource();
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(leader), K(coordinator_addr), K(apply_arg));
break;
}
usleep(RESOURCE_OP_WAIT_INTERVAL_US);
}
usleep(RESOURCE_OP_WAIT_INTERVAL_US);
}
}
}

View File

@ -613,17 +613,9 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx)
} else if (table_ctx->is_assigned_resource()) {
if (OB_FAIL(service->assigned_task_manager_.delete_assigned_task(release_arg.task_key_))) {
LOG_WARN("fail to delete_assigned_task", KR(ret), K(release_arg.task_key_));
} else if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(GCONF.cluster_id,
release_arg.tenant_id_,
share::SYS_LS,
leader))) {
LOG_WARN("fail to get ls location leader", KR(ret), K(release_arg.tenant_id_));
} else if (ObTableLoadUtils::is_local_addr(leader)) {
if (OB_FAIL(ObTableLoadResourceService::release_resource(release_arg))) {
LOG_WARN("fail to release resource", KR(ret));
}
} else {
TABLE_LOAD_RESOURCE_RPC_CALL(release_resource, leader, release_arg);
} else if (OB_FAIL(ObTableLoadResourceService::release_resource(release_arg))) {
LOG_WARN("fail to release resource", KR(ret));
ret = OB_SUCCESS; // 允许失败,资源管理模块可以回收
}
}
}

View File

@ -42,7 +42,7 @@ int ObDirectLoadResourceApplyExecutor::process()
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadResourceService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(ObTableLoadResourceService::apply_resource(arg_, res_))) {
} else if (OB_FAIL(ObTableLoadResourceService::local_apply_resource(arg_, res_))) {
LOG_WARN("fail to apply resource", KR(ret));
}
@ -69,7 +69,7 @@ int ObDirectLoadResourceReleaseExecutor::process()
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadResourceService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(ObTableLoadResourceService::release_resource(arg_))) {
} else if (OB_FAIL(ObTableLoadResourceService::local_release_resource(arg_))) {
LOG_WARN("fail to release resource", KR(ret));
}
@ -96,7 +96,7 @@ int ObDirectLoadResourceUpdateExecutor::process()
int ret = OB_SUCCESS;
if (OB_FAIL(ObTableLoadResourceService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else if (OB_FAIL(ObTableLoadResourceService::update_resource(arg_))) {
} else if (OB_FAIL(ObTableLoadResourceService::local_update_resource(arg_))) {
LOG_WARN("fail to update resource", KR(ret));
}

View File

@ -17,6 +17,7 @@
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "share/rc/ob_tenant_base.h"
#include "share/schema/ob_table_schema.h"
#include "share/location_cache/ob_location_struct.h"
namespace oceanbase
{
@ -247,7 +248,24 @@ int ObTableLoadResourceService::check_tenant()
return ret;
}
int ObTableLoadResourceService::apply_resource(ObDirectLoadResourceApplyArg &arg, ObDirectLoadResourceOpRes &res)
int ObTableLoadResourceService::get_leader_addr(
const uint64_t tenant_id,
const share::ObLSID &ls_id,
ObAddr &leader)
{
int ret = OB_SUCCESS;
if (OB_FAIL(GCTX.location_service_->get_leader_with_retry_until_timeout(
GCONF.cluster_id, tenant_id, ls_id, leader, GET_LEADER_RETRY_TIMEOUT))) {
LOG_WARN("fail to get ls location leader", KR(ret), K(tenant_id), K(ls_id));
if (is_location_service_renew_error(ret)) {
ret = OB_EAGAIN;
}
}
return ret;
}
int ObTableLoadResourceService::local_apply_resource(ObDirectLoadResourceApplyArg &arg, ObDirectLoadResourceOpRes &res)
{
int ret = OB_SUCCESS;
ObTableLoadResourceService *service = nullptr;
@ -264,7 +282,7 @@ int ObTableLoadResourceService::apply_resource(ObDirectLoadResourceApplyArg &arg
return ret;
}
int ObTableLoadResourceService::release_resource(ObDirectLoadResourceReleaseArg &arg)
int ObTableLoadResourceService::local_release_resource(ObDirectLoadResourceReleaseArg &arg)
{
int ret = OB_SUCCESS;
ObTableLoadResourceService *service = nullptr;
@ -281,7 +299,7 @@ int ObTableLoadResourceService::release_resource(ObDirectLoadResourceReleaseArg
return ret;
}
int ObTableLoadResourceService::update_resource(ObDirectLoadResourceUpdateArg &arg)
int ObTableLoadResourceService::local_update_resource(ObDirectLoadResourceUpdateArg &arg)
{
int ret = OB_SUCCESS;
ObTableLoadResourceService *service = nullptr;
@ -298,5 +316,65 @@ int ObTableLoadResourceService::update_resource(ObDirectLoadResourceUpdateArg &a
return ret;
}
int ObTableLoadResourceService::apply_resource(ObDirectLoadResourceApplyArg &arg, ObDirectLoadResourceOpRes &res)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(arg), KR(ret));
} else {
ObAddr leader;
if (OB_FAIL(get_leader_addr(arg.tenant_id_, share::SYS_LS, leader))) {
LOG_WARN("fail to get leader addr", KR(ret));
} else if (ObTableLoadUtils::is_local_addr(leader)) {
ret = local_apply_resource(arg, res);
} else {
TABLE_LOAD_RESOURCE_RPC_CALL(apply_resource, leader, arg, res);
}
}
return ret;
}
int ObTableLoadResourceService::release_resource(ObDirectLoadResourceReleaseArg &arg)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(arg), KR(ret));
} else {
ObAddr leader;
if (OB_FAIL(get_leader_addr(arg.tenant_id_, share::SYS_LS, leader))) {
LOG_WARN("fail to get leader addr", KR(ret));
} else if (ObTableLoadUtils::is_local_addr(leader)) {
ret = local_release_resource(arg);
} else {
TABLE_LOAD_RESOURCE_RPC_CALL(release_resource, leader, arg);
}
}
return ret;
}
int ObTableLoadResourceService::update_resource(ObDirectLoadResourceUpdateArg &arg)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(arg), KR(ret));
} else {
ObAddr leader;
if (OB_FAIL(get_leader_addr(arg.tenant_id_, share::SYS_LS, leader))) {
LOG_WARN("fail to get leader addr", KR(ret));
} else if (ObTableLoadUtils::is_local_addr(leader)) {
ret = local_update_resource(arg);
} else {
TABLE_LOAD_RESOURCE_RPC_CALL(update_resource, leader, arg);
}
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -32,6 +32,8 @@ class ObTableLoadResourceService : public logservice::ObIReplaySubHandler,
public logservice::ObICheckpointSubHandler,
public logservice::ObIRoleChangeSubHandler
{
public:
static const int64_t GET_LEADER_RETRY_TIMEOUT = 1 * 1000LL * 1000LL; // 1s
public:
ObTableLoadResourceService()
: resource_manager_(nullptr),
@ -71,10 +73,14 @@ public:
void switch_to_follower_forcedly();
static int check_tenant();
uint64_t get_tenant_id() const { return tenant_id_; }
static int get_leader_addr(const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObAddr &leader);
static int local_apply_resource(ObDirectLoadResourceApplyArg &arg, ObDirectLoadResourceOpRes &res);
static int local_release_resource(ObDirectLoadResourceReleaseArg &arg);
static int local_update_resource(ObDirectLoadResourceUpdateArg &arg);
static int apply_resource(ObDirectLoadResourceApplyArg &arg, ObDirectLoadResourceOpRes &res);
static int release_resource(ObDirectLoadResourceReleaseArg &arg);
static int update_resource(ObDirectLoadResourceUpdateArg &arg);
uint64_t get_tenant_id() const { return tenant_id_; }
private:
int alloc_resource_manager();
int delete_resource_manager();