[CP] release resource when the merging is complete
This commit is contained in:
parent
63e6b43c77
commit
1925efa4bd
@ -100,9 +100,6 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam
|
||||
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
|
||||
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) {
|
||||
LOG_WARN("fail to init table ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadService::assign_memory(param.need_sort_, param.avail_memory_))) {
|
||||
LOG_WARN("fail to assign_memory", KR(ret));
|
||||
} else if (FALSE_IT(table_ctx->set_assigned_memory())) {
|
||||
} else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx, arg_.partition_id_array_,
|
||||
arg_.target_partition_id_array_))) {
|
||||
LOG_WARN("fail to store init ctx", KR(ret));
|
||||
@ -111,12 +108,6 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (nullptr != table_ctx) {
|
||||
if (table_ctx->is_assigned_memory()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(param.need_sort_, param.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(param));
|
||||
}
|
||||
}
|
||||
ObTableLoadService::free_ctx(table_ctx);
|
||||
table_ctx = nullptr;
|
||||
}
|
||||
|
@ -466,19 +466,14 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
|
||||
if (ObTableLoadUtils::is_local_addr(addr)) { // 本机
|
||||
ctx_->param_.session_count_ = arg.config_.parallel_;
|
||||
ctx_->param_.avail_memory_ = arg.avail_memory_;
|
||||
if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, arg.avail_memory_))) {
|
||||
LOG_WARN("fail to assign_memory", KR(ret));
|
||||
if (OB_FAIL(ObTableLoadStore::init_ctx(ctx_, arg.partition_id_array_, arg.target_partition_id_array_))) {
|
||||
LOG_WARN("fail to store init ctx", KR(ret));
|
||||
} else {
|
||||
ctx_->set_assigned_memory();
|
||||
if (OB_FAIL(ObTableLoadStore::init_ctx(ctx_, arg.partition_id_array_, arg.target_partition_id_array_))) {
|
||||
LOG_WARN("fail to store init ctx", KR(ret));
|
||||
} else {
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
LOG_WARN("fail to init store", KR(ret));
|
||||
} else if (OB_FAIL(store.pre_begin())) {
|
||||
LOG_WARN("fail to store pre begin", KR(ret));
|
||||
}
|
||||
ObTableLoadStore store(ctx_);
|
||||
if (OB_FAIL(store.init())) {
|
||||
LOG_WARN("fail to init store", KR(ret));
|
||||
} else if (OB_FAIL(store.pre_begin())) {
|
||||
LOG_WARN("fail to store pre begin", KR(ret));
|
||||
}
|
||||
}
|
||||
} else { // 对端, 发送rpc
|
||||
@ -1022,6 +1017,18 @@ int ObTableLoadCoordinator::commit_peers(ObTableLoadSqlStatistics &sql_statistic
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (ctx_->is_assigned_resource()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObDirectLoadResourceReleaseArg release_arg;
|
||||
release_arg.tenant_id_ = MTL_ID();
|
||||
release_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_);
|
||||
if (OB_TMP_FAIL(ObTableLoadService::delete_assigned_task(release_arg))) {
|
||||
LOG_WARN("fail to delete assigned task", KR(tmp_ret), K(release_arg));
|
||||
}
|
||||
ctx_->reset_assigned_resource();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -665,23 +665,24 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx)
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
common::ObAddr leader;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObDirectLoadResourceReleaseArg release_arg;
|
||||
release_arg.tenant_id_ = MTL_ID();
|
||||
release_arg.task_key_ = ObTableLoadUniqueKey(table_ctx->param_.table_id_, table_ctx->ddl_param_.task_id_);
|
||||
bool is_sort = (table_ctx->param_.exe_mode_ == ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT ||
|
||||
table_ctx->param_.exe_mode_ == ObTableLoadExeMode::MEM_COMPACT);
|
||||
if (OB_FAIL(service->get_manager().remove_table_ctx(release_arg.task_key_, table_ctx))) {
|
||||
LOG_WARN("fail to remove_table_ctx", KR(ret), K(release_arg.task_key_));
|
||||
} else if (table_ctx->is_assigned_memory() &&
|
||||
OB_FAIL(service->assigned_memory_manager_.recycle_memory(is_sort, table_ctx->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle_memory", KR(ret), K(release_arg.task_key_));
|
||||
} else if (table_ctx->is_assigned_resource()) {
|
||||
if (OB_FAIL(service->assigned_task_manager_.delete_assigned_task(release_arg.task_key_))) {
|
||||
LOG_WARN("fail to delete_assigned_task", KR(ret), K(release_arg.task_key_));
|
||||
} else if (OB_FAIL(ObTableLoadResourceService::release_resource(release_arg))) {
|
||||
LOG_WARN("fail to release resource", KR(ret));
|
||||
ret = OB_SUCCESS; // 允许失败,资源管理模块可以回收
|
||||
} else {
|
||||
if (table_ctx->is_assigned_memory()) {
|
||||
if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.need_sort_, table_ctx->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(release_arg.task_key_));
|
||||
}
|
||||
table_ctx->reset_assigned_memory();
|
||||
}
|
||||
if (table_ctx->is_assigned_resource()) {
|
||||
if (OB_TMP_FAIL(ObTableLoadService::delete_assigned_task(release_arg))) {
|
||||
LOG_WARN("fail to delete assigned task", KR(tmp_ret), K(release_arg));
|
||||
}
|
||||
table_ctx->reset_assigned_resource();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -958,6 +959,25 @@ int ObTableLoadService::add_assigned_task(ObDirectLoadResourceApplyArg &arg)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadService::delete_assigned_task(ObDirectLoadResourceReleaseArg &arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableLoadService *service = nullptr;
|
||||
if (OB_ISNULL(service = MTL(ObTableLoadService *))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
if (OB_FAIL(service->assigned_task_manager_.delete_assigned_task(arg.task_key_))) {
|
||||
LOG_WARN("fail to delete_assigned_task", KR(ret), K(arg.task_key_));
|
||||
} else if (OB_FAIL(ObTableLoadResourceService::release_resource(arg))) {
|
||||
LOG_WARN("fail to release resource", KR(ret));
|
||||
ret = OB_SUCCESS; // 允许失败,资源管理模块可以回收
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadService::assign_memory(bool is_sort, int64_t assign_memory)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -74,6 +74,7 @@ public:
|
||||
}
|
||||
static int get_memory_limit(int64_t &memory_limit);
|
||||
static int add_assigned_task(ObDirectLoadResourceApplyArg &arg);
|
||||
static int delete_assigned_task(ObDirectLoadResourceReleaseArg &arg);
|
||||
static int assign_memory(bool is_sort, int64_t assign_memory);
|
||||
static int recycle_memory(bool is_sort, int64_t assign_memory);
|
||||
static int get_sort_memory(int64_t &sort_memory);
|
||||
|
@ -142,7 +142,11 @@ int ObTableLoadStore::pre_begin()
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
LOG_INFO("store pre begin");
|
||||
// do nothing
|
||||
if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to assign_memory", KR(ret));
|
||||
} else {
|
||||
ctx_->set_assigned_memory();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -461,6 +465,13 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
|
||||
} else if (OB_FAIL(store_ctx_->set_status_commit())) {
|
||||
LOG_WARN("fail to set store status commit", KR(ret));
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (ctx_->is_assigned_memory()) {
|
||||
if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle memory", KR(tmp_ret));
|
||||
}
|
||||
ctx_->reset_assigned_memory();
|
||||
}
|
||||
store_ctx_->set_enable_heart_beat_check(false);
|
||||
result_info = store_ctx_->result_info_;
|
||||
}
|
||||
|
@ -47,8 +47,10 @@ public:
|
||||
int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); }
|
||||
bool is_assigned_resource() const { return is_assigned_resource_; }
|
||||
void set_assigned_resource() { is_assigned_resource_ = true; }
|
||||
void reset_assigned_resource() { is_assigned_resource_ = false; }
|
||||
bool is_assigned_memory() const { return is_assigned_memory_; }
|
||||
void set_assigned_memory() { is_assigned_memory_ = true; }
|
||||
void reset_assigned_memory() { is_assigned_memory_ = false; }
|
||||
bool is_dirty() const { return is_dirty_; }
|
||||
void set_dirty() { is_dirty_ = true; }
|
||||
bool is_mark_delete() const { return mark_delete_; }
|
||||
|
Loading…
x
Reference in New Issue
Block a user