From f4c3e7513b9178da53cf538c7f961d04e3b8dbd4 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 7 Nov 2024 05:14:15 +0000 Subject: [PATCH] fix bug of alloc memory --- .../ob_table_load_control_rpc_executor.cpp | 1 + .../table_load/ob_table_load_coordinator.cpp | 157 ++++++++++++------ .../table_load/ob_table_load_coordinator.h | 1 + .../table_load/ob_table_load_service.cpp | 2 +- .../table_load/ob_table_load_store.cpp | 4 +- .../table_load/ob_table_load_struct.h | 9 +- src/share/table/ob_table_load_define.cpp | 3 +- src/share/table/ob_table_load_define.h | 8 +- 8 files changed, 122 insertions(+), 63 deletions(-) diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index e0f3d4077..7ec928a45 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -60,6 +60,7 @@ int ObDirectLoadControlPreBeginExecutor::process() param.max_error_row_count_ = arg_.config_.max_error_row_count_; param.column_count_ = arg_.column_count_; param.need_sort_ = arg_.config_.is_need_sort_; + param.task_need_sort_ = arg_.config_.is_task_need_sort_; param.px_mode_ = arg_.px_mode_; param.online_opt_stat_gather_ = arg_.online_opt_stat_gather_; param.dup_action_ = arg_.dup_action_; diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index e00bddb8b..a0d76b14b 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -252,6 +252,30 @@ int ObTableLoadCoordinator::init() /** * begin */ +int ObTableLoadCoordinator::check_need_sort_for_lob_or_index(bool &need_sort) const +{ + int ret = OB_SUCCESS; + need_sort = false; + if (ObDirectLoadMethod::is_incremental(ctx_->param_.method_)) { + need_sort = ctx_->schema_.lob_column_idxs_.count() > 0; + if (!need_sort) { + ObSchemaGetterGuard schema_guard; + const share::schema::ObTableSchema *data_table_schema = nullptr; + if (OB_FAIL(ObTableLoadSchema::get_schema_guard(ctx_->param_.tenant_id_, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret)); + } else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard, ctx_->param_.tenant_id_, ctx_->param_.table_id_, data_table_schema))) { + LOG_WARN("fail to get table shema of main table", KR(ret)); + } else if (OB_ISNULL(data_table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("data table schema is null", KR(ret)); + } else { + need_sort = data_table_schema->get_simple_index_infos().count() > 0; + } + } + } + return ret; +} + int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg) { int ret = OB_SUCCESS; @@ -294,8 +318,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar LOG_WARN("fail to get memory_limit", K(ret)); } else { bool include_cur_addr = false; - bool need_sort = ctx_->param_.need_sort_; - bool main_need_sort = ctx_->param_.need_sort_; + bool task_need_sort = false; // 表示整个导入任务是否会走排序的流程 + bool main_need_sort = false; // 表示主表是否会走排序 int64_t total_partitions = 0; ObArray partitions; int64_t store_server_count = all_leader_info_array.count(); @@ -306,19 +330,16 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count); int64_t remain_session_count = total_session_count; partitions.set_tenant_id(MTL_ID()); + + // 判断coordinator节点是否也作为store节点 for (int64_t i = 0; i < store_server_count; i++) { total_partitions += all_leader_info_array[i].partition_id_array_.count(); if (coordinator_addr == all_leader_info_array[i].addr_) { include_cur_addr = true; } } - // is_heap_table==true,we prioritize non multiple modes that do not require sorting - // FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel - // is_heap_table==false,we prioritize non multiple modes that do not require sorting - // GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel) - // param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK) - // MULTIPLE_HEAP_TABLE_COMPACT - // MEM_COMPACT + + // 资源控制先确定线程,第一次遍历先按分区等比例分配线程 for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { ObDirectLoadResourceUnit unit; unit.addr_ = all_leader_info_array[i].addr_; @@ -333,18 +354,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar } } } - // 协调节点不存在数据分区时,需要申请线程资源,但不需要申请内存,为零 - if (OB_SUCC(ret) && !include_cur_addr) { - ObDirectLoadResourceUnit unit; - unit.addr_ = coordinator_addr; - unit.thread_count_ = MAX(total_session_count / store_server_count, MIN_THREAD_COUNT); - unit.memory_size_ = 0; - coordinator_session_count = unit.thread_count_; - min_session_count = MIN(min_session_count, unit.thread_count_); - if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { - LOG_WARN("fail to push back", KR(ret)); - } - } + + // 第一次遍历如果不能分配完所有的线程,继续给每个节点平均分配剩余的线程,直到所有的线程都被分配完 if (OB_SUCC(ret)) { while (remain_session_count > 0) { for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) { @@ -355,72 +366,110 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar } } } - for (int64_t i = 0; i < store_server_count; i++) { + } + + /* + 协调节点不存在数据分区时,需要申请线程资源,但不需要申请内存,放在申请资源数组的最后一个位置 + coordinator_session_count表示coordinator节点可用线程数,min_session_count表示所有节点可用线程数的最小值 + */ + if (OB_SUCC(ret)) { + if (!include_cur_addr) { + ObDirectLoadResourceUnit unit; + unit.addr_ = coordinator_addr; + unit.thread_count_ = MAX(MIN(max_session_count, total_session_count / store_server_count), MIN_THREAD_COUNT); + unit.memory_size_ = 0; + coordinator_session_count = unit.thread_count_; + min_session_count = MIN(min_session_count, unit.thread_count_); + if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { + LOG_WARN("fail to push back", KR(ret)); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; if (all_leader_info_array[i].addr_ == coordinator_addr) { coordinator_session_count = unit.thread_count_; } min_session_count = MIN(min_session_count, unit.thread_count_); } + } + + /* + 确定write_session_count,表示发送数据阶段store节点可用的线程数 + 对于load data模式,如果 协调节点和数据节点都在同一个节点上,就分出一半的线程用于解析数据,一半的线程用于存储数据 + */ + if (OB_SUCC(ret)) { if (include_cur_addr && ctx_->param_.load_mode_ == ObDirectLoadMode::LOAD_DATA) { - // 协调节点和数据节点都在同一个节点上,对于load data模式,分出一半的线程用于解析数据,一半的线程用于存储数据 write_session_count = MIN(min_session_count, (coordinator_session_count + 1) / 2); } else { write_session_count = min_session_count; } - for (int64_t i = 0; i < store_server_count; i++) { + } + + /* + 先确定主表是否要走排序,对于堆表,sql指定need_sort=true时,如果内存满足不排序,就走不排序流程,只要有一个节点内存不足,整体都要走排序 + 如果主表要走排序,则整个任务按排序方式分配内存,否则再确定是否要做lob_id排序和索引排序,需要的话就按照MAX(主表不排序内存,索引排序内存)来分配内存,否则分配主表不排序需要的内存 + */ + if (OB_SUCC(ret)) { + for (int64_t i = 0; !main_need_sort && i < store_server_count; i++) { ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; + int64_t min_sort_memory = unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4; int64_t min_unsort_memory = 0; if (ctx_->schema_.is_heap_table_) { // 直接写宏块需要的内存,对于非排序模式,每个分区各自写宏块,所以要乘分区数 min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count; - if (min_unsort_memory <= memory_limit) { - main_need_sort = false; - unit.memory_size_ = min_unsort_memory; + if (!ctx_->param_.need_sort_) { + // sql指定need_sort=false,强制不排序 } else { - main_need_sort = ctx_->param_.need_sort_; // allow forced non-sorting - unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit); + if (min_unsort_memory > memory_limit) { + main_need_sort = true; + } } } else { // 取写宏块或写临时文件需要内存的最小值,对于非排序模式,每个分区各自写临时文件,所以要乘分区数 min_unsort_memory = SSTABLE_BUFFER_SIZE * partitions[i] * unit.thread_count_; - if (main_need_sort) { - unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit); + if (ctx_->param_.need_sort_) { + // sql指定need_sort=true,强制走排序 + main_need_sort = true; } else { - // hint指定不排序,如果不排序内存大于内存上限,要改成走排序模式,一般是分区数较大的场景 - if (min_unsort_memory < memory_limit) { - unit.memory_size_ = MAX(min_unsort_memory, MACROBLOCK_BUFFER_SIZE * write_session_count); - } else { + if (min_unsort_memory > memory_limit) { main_need_sort = true; - unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit); } } } + } - if (main_need_sort) { - break; + task_need_sort = main_need_sort; + if (!task_need_sort) { + if (OB_FAIL(check_need_sort_for_lob_or_index(task_need_sort))) { + LOG_WARN("fail to check need sort for lob or index", KR(ret)); } } - ObSchemaGetterGuard schema_guard; - const ObTableSchema *table_schema = nullptr; - if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, ctx_->ddl_param_.dest_table_id_, schema_guard, table_schema))) { - LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(ctx_->ddl_param_.dest_table_id_)); - } else if (main_need_sort || (ObDirectLoadMethod::is_incremental(ctx_->param_.method_) && - table_schema->get_simple_index_infos().count() > 0)) { - need_sort = true; - for (int64_t i = 0; i < store_server_count; i++) { - ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; - unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit); + for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { + ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; + int64_t min_sort_memory = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit); + int64_t min_unsort_memory = 0; + if (ctx_->schema_.is_heap_table_) { + min_unsort_memory = MIN(MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count, memory_limit); + } else { + min_unsort_memory = MIN(MAX(SSTABLE_BUFFER_SIZE * partitions[i] * unit.thread_count_, MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit); + } + if (task_need_sort) { + if (main_need_sort) { + unit.memory_size_ = min_sort_memory; + } else { + unit.memory_size_ = MAX(min_unsort_memory, min_sort_memory); + } + } else { + unit.memory_size_ = min_unsort_memory; } - } else { - need_sort = false; } } + if (OB_SUCC(ret)) { ObDirectLoadResourceOpRes apply_res; if (OB_FAIL(ObTableLoadResourceService::apply_resource(apply_arg, apply_res))) { if (retry_count % 100 == 0) { - LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count)); + LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count), K(param_.exe_mode_), K(main_need_sort), K(task_need_sort), K(partitions), K(coordinator_addr), K(apply_arg)); } if (ret == OB_EAGAIN) { retry_count++; @@ -428,7 +477,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar usleep(RESOURCE_OP_WAIT_INTERVAL_US); } } else { - ctx_->param_.need_sort_ = need_sort; + ctx_->param_.need_sort_ = main_need_sort; + ctx_->param_.task_need_sort_ = task_need_sort; ctx_->param_.session_count_ = coordinator_session_count; ctx_->param_.write_session_count_ = write_session_count; ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? @@ -439,7 +489,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar LOG_WARN("fail to add_assigned_task", KR(ret)); } else { ctx_->set_assigned_resource(); - LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(coordinator_addr), K(apply_arg)); + LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(main_need_sort), K(task_need_sort), K(partitions), K(coordinator_addr), K(apply_arg)); break; } } @@ -473,6 +523,7 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ arg.config_.max_error_row_count_ = param_.max_error_row_count_; arg.config_.batch_size_ = param_.batch_size_; arg.config_.is_need_sort_ = param_.need_sort_; + arg.config_.is_task_need_sort_ = param_.task_need_sort_; arg.column_count_ = param_.column_count_; arg.dup_action_ = param_.dup_action_; arg.px_mode_ = param_.px_mode_; diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index eac3aff5c..2a04e929a 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -65,6 +65,7 @@ public: int get_status(table::ObTableLoadStatusType &status, int &error_code); int heart_beat(); private: + int check_need_sort_for_lob_or_index(bool &need_sort) const; int gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg); int pre_begin_peers(ObDirectLoadResourceApplyArg &apply_arg); int confirm_begin_peers(); diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 2047e923c..f998e2bb1 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -821,7 +821,7 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx) LOG_WARN("fail to remove_table_ctx", KR(ret), K(release_arg.task_key_)); } else { if (table_ctx->is_assigned_memory()) { - if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.need_sort_, table_ctx->param_.avail_memory_))) { + if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.task_need_sort_, table_ctx->param_.avail_memory_))) { LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(release_arg.task_key_)); } table_ctx->reset_assigned_memory(); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index ca7b89d7d..8ca36694d 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -172,7 +172,7 @@ int ObTableLoadStore::pre_begin() LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { LOG_INFO("store pre begin"); - if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) { + if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.task_need_sort_, ctx_->param_.avail_memory_))) { LOG_WARN("fail to assign_memory", KR(ret)); } else { ctx_->set_assigned_memory(); @@ -500,7 +500,7 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, } else { int tmp_ret = OB_SUCCESS; if (ctx_->is_assigned_memory()) { - if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) { + if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.task_need_sort_, ctx_->param_.avail_memory_))) { LOG_WARN("fail to recycle memory", KR(tmp_ret)); } ctx_->reset_assigned_memory(); diff --git a/src/observer/table_load/ob_table_load_struct.h b/src/observer/table_load/ob_table_load_struct.h index b166b4319..9c6f7a746 100644 --- a/src/observer/table_load/ob_table_load_struct.h +++ b/src/observer/table_load/ob_table_load_struct.h @@ -116,7 +116,8 @@ public: load_mode_(storage::ObDirectLoadMode::INVALID_MODE), compressor_type_(ObCompressorType::INVALID_COMPRESSOR), online_sample_percent_(1.), - load_level_(storage::ObDirectLoadLevel::INVALID_LEVEL) + load_level_(storage::ObDirectLoadLevel::INVALID_LEVEL), + task_need_sort_(false) { } @@ -176,7 +177,8 @@ public: "direct_load_mode", storage::ObDirectLoadMode::get_type_string(load_mode_), K_(compressor_type), K_(online_sample_percent), - "direct_load_level", storage::ObDirectLoadLevel::get_type_string(load_level_)); + "direct_load_level", storage::ObDirectLoadLevel::get_type_string(load_level_), + K_(task_need_sort)); public: uint64_t tenant_id_; @@ -187,7 +189,7 @@ public: uint64_t max_error_row_count_; uint64_t sql_mode_; // unused int32_t column_count_; - bool need_sort_; + bool need_sort_; // 表示主表是否要排序 bool px_mode_; bool online_opt_stat_gather_; sql::ObLoadDupActionType dup_action_; @@ -200,6 +202,7 @@ public: ObCompressorType compressor_type_; double online_sample_percent_; storage::ObDirectLoadLevel::Type load_level_; + bool task_need_sort_; // 表示导入任务是否会走到排序流程 }; struct ObTableLoadDDLParam diff --git a/src/share/table/ob_table_load_define.cpp b/src/share/table/ob_table_load_define.cpp index 139fe7422..cb49398c1 100644 --- a/src/share/table/ob_table_load_define.cpp +++ b/src/share/table/ob_table_load_define.cpp @@ -24,7 +24,8 @@ OB_SERIALIZE_MEMBER(ObTableLoadConfig, batch_size_, max_error_row_count_, dup_action_, - is_need_sort_); + is_need_sort_, + is_task_need_sort_); OB_SERIALIZE_MEMBER(ObTableLoadSegmentID, id_); diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index 67bcd4c3e..17c392468 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -37,17 +37,19 @@ public: batch_size_(0), max_error_row_count_(0), dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE), - is_need_sort_(false) + is_need_sort_(false), + is_task_need_sort_(false) { } TO_STRING_KV(K_(parallel), K_(batch_size), K_(max_error_row_count), K_(dup_action), - K_(is_need_sort)); + K_(is_need_sort), K_(is_task_need_sort)); public: int32_t parallel_; int32_t batch_size_; uint64_t max_error_row_count_; sql::ObLoadDupActionType dup_action_; - bool is_need_sort_; + bool is_need_sort_; // 表示主表是否要排序 + bool is_task_need_sort_; // 表示导入任务是否会走到排序流程 }; struct ObTableLoadPartitionId