From 1925efa4bd5f173e53b1a84e4e9a667f74d0c847 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 15 Jul 2024 09:46:59 +0000 Subject: [PATCH] [CP] release resource when the merging is complete --- .../ob_table_load_control_rpc_executor.cpp | 9 ---- .../table_load/ob_table_load_coordinator.cpp | 31 ++++++++----- .../table_load/ob_table_load_service.cpp | 44 ++++++++++++++----- .../table_load/ob_table_load_service.h | 1 + .../table_load/ob_table_load_store.cpp | 13 +++++- .../table_load/ob_table_load_table_ctx.h | 2 + 6 files changed, 66 insertions(+), 34 deletions(-) diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index 0ea1f456e..137240cd2 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -100,9 +100,6 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); } else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) { LOG_WARN("fail to init table ctx", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::assign_memory(param.need_sort_, param.avail_memory_))) { - LOG_WARN("fail to assign_memory", KR(ret)); - } else if (FALSE_IT(table_ctx->set_assigned_memory())) { } else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx, arg_.partition_id_array_, arg_.target_partition_id_array_))) { LOG_WARN("fail to store init ctx", KR(ret)); @@ -111,12 +108,6 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam } if (OB_FAIL(ret)) { if (nullptr != table_ctx) { - if (table_ctx->is_assigned_memory()) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(param.need_sort_, param.avail_memory_))) { - LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(param)); - } - } ObTableLoadService::free_ctx(table_ctx); table_ctx = nullptr; } diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index fd7a0d364..dc8752ed2 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -466,19 +466,14 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ if (ObTableLoadUtils::is_local_addr(addr)) { // 本机 ctx_->param_.session_count_ = arg.config_.parallel_; ctx_->param_.avail_memory_ = arg.avail_memory_; - if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, arg.avail_memory_))) { - LOG_WARN("fail to assign_memory", KR(ret)); + if (OB_FAIL(ObTableLoadStore::init_ctx(ctx_, arg.partition_id_array_, arg.target_partition_id_array_))) { + LOG_WARN("fail to store init ctx", KR(ret)); } else { - ctx_->set_assigned_memory(); - if (OB_FAIL(ObTableLoadStore::init_ctx(ctx_, arg.partition_id_array_, arg.target_partition_id_array_))) { - LOG_WARN("fail to store init ctx", KR(ret)); - } else { - ObTableLoadStore store(ctx_); - if (OB_FAIL(store.init())) { - LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.pre_begin())) { - LOG_WARN("fail to store pre begin", KR(ret)); - } + ObTableLoadStore store(ctx_); + if (OB_FAIL(store.init())) { + LOG_WARN("fail to init store", KR(ret)); + } else if (OB_FAIL(store.pre_begin())) { + LOG_WARN("fail to store pre begin", KR(ret)); } } } else { // 对端, 发送rpc @@ -1022,6 +1017,18 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic } } } + if (OB_SUCC(ret)) { + if (ctx_->is_assigned_resource()) { + int tmp_ret = OB_SUCCESS; + ObDirectLoadResourceReleaseArg release_arg; + release_arg.tenant_id_ = MTL_ID(); + release_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_); + if (OB_TMP_FAIL(ObTableLoadService::delete_assigned_task(release_arg))) { + LOG_WARN("fail to delete assigned task", KR(tmp_ret), K(release_arg)); + } + ctx_->reset_assigned_resource(); + } + } } return ret; } diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index a8647794a..e5542401e 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -665,23 +665,24 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx) ret = OB_ERR_SYS; LOG_WARN("null table load service", KR(ret)); } else { - common::ObAddr leader; + int tmp_ret = OB_SUCCESS; ObDirectLoadResourceReleaseArg release_arg; release_arg.tenant_id_ = MTL_ID(); release_arg.task_key_ = ObTableLoadUniqueKey(table_ctx->param_.table_id_, table_ctx->ddl_param_.task_id_); - bool is_sort = (table_ctx->param_.exe_mode_ == ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT || - table_ctx->param_.exe_mode_ == ObTableLoadExeMode::MEM_COMPACT); if (OB_FAIL(service->get_manager().remove_table_ctx(release_arg.task_key_, table_ctx))) { LOG_WARN("fail to remove_table_ctx", KR(ret), K(release_arg.task_key_)); - } else if (table_ctx->is_assigned_memory() && - OB_FAIL(service->assigned_memory_manager_.recycle_memory(is_sort, table_ctx->param_.avail_memory_))) { - LOG_WARN("fail to recycle_memory", KR(ret), K(release_arg.task_key_)); - } else if (table_ctx->is_assigned_resource()) { - if (OB_FAIL(service->assigned_task_manager_.delete_assigned_task(release_arg.task_key_))) { - LOG_WARN("fail to delete_assigned_task", KR(ret), K(release_arg.task_key_)); - } else if (OB_FAIL(ObTableLoadResourceService::release_resource(release_arg))) { - LOG_WARN("fail to release resource", KR(ret)); - ret = OB_SUCCESS; // 允许失败,资源管理模块可以回收 + } else { + if (table_ctx->is_assigned_memory()) { + if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.need_sort_, table_ctx->param_.avail_memory_))) { + LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(release_arg.task_key_)); + } + table_ctx->reset_assigned_memory(); + } + if (table_ctx->is_assigned_resource()) { + if (OB_TMP_FAIL(ObTableLoadService::delete_assigned_task(release_arg))) { + LOG_WARN("fail to delete assigned task", KR(tmp_ret), K(release_arg)); + } + table_ctx->reset_assigned_resource(); } } } @@ -958,6 +959,25 @@ int ObTableLoadService::add_assigned_task(ObDirectLoadResourceApplyArg &arg) return ret; } +int ObTableLoadService::delete_assigned_task(ObDirectLoadResourceReleaseArg &arg) +{ + int ret = OB_SUCCESS; + ObTableLoadService *service = nullptr; + if (OB_ISNULL(service = MTL(ObTableLoadService *))) { + ret = OB_ERR_SYS; + LOG_WARN("null table load service", KR(ret)); + } else { + if (OB_FAIL(service->assigned_task_manager_.delete_assigned_task(arg.task_key_))) { + LOG_WARN("fail to delete_assigned_task", KR(ret), K(arg.task_key_)); + } else if (OB_FAIL(ObTableLoadResourceService::release_resource(arg))) { + LOG_WARN("fail to release resource", KR(ret)); + ret = OB_SUCCESS; // 允许失败,资源管理模块可以回收 + } + } + + return ret; +} + int ObTableLoadService::assign_memory(bool is_sort, int64_t assign_memory) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 16654330d..2e2c8655d 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -74,6 +74,7 @@ public: } static int get_memory_limit(int64_t &memory_limit); static int add_assigned_task(ObDirectLoadResourceApplyArg &arg); + static int delete_assigned_task(ObDirectLoadResourceReleaseArg &arg); static int assign_memory(bool is_sort, int64_t assign_memory); static int recycle_memory(bool is_sort, int64_t assign_memory); static int get_sort_memory(int64_t &sort_memory); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index 14de3f4b2..b062848d9 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -142,7 +142,11 @@ int ObTableLoadStore::pre_begin() LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { LOG_INFO("store pre begin"); - // do nothing + if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) { + LOG_WARN("fail to assign_memory", KR(ret)); + } else { + ctx_->set_assigned_memory(); + } } return ret; @@ -461,6 +465,13 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, } else if (OB_FAIL(store_ctx_->set_status_commit())) { LOG_WARN("fail to set store status commit", KR(ret)); } else { + int tmp_ret = OB_SUCCESS; + if (ctx_->is_assigned_memory()) { + if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) { + LOG_WARN("fail to recycle memory", KR(tmp_ret)); + } + ctx_->reset_assigned_memory(); + } store_ctx_->set_enable_heart_beat_check(false); result_info = store_ctx_->result_info_; } diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index be692f211..a79d8fe07 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -47,8 +47,10 @@ public: int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); } bool is_assigned_resource() const { return is_assigned_resource_; } void set_assigned_resource() { is_assigned_resource_ = true; } + void reset_assigned_resource() { is_assigned_resource_ = false; } bool is_assigned_memory() const { return is_assigned_memory_; } void set_assigned_memory() { is_assigned_memory_ = true; } + void reset_assigned_memory() { is_assigned_memory_ = false; } bool is_dirty() const { return is_dirty_; } void set_dirty() { is_dirty_ = true; } bool is_mark_delete() const { return mark_delete_; }