diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 18ac85a44..31b220ba5 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -253,156 +253,133 @@ int ObTableLoadCoordinator::init() int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg) { int ret = OB_SUCCESS; - int64_t memory_limit = 0; ObTenant *tenant = nullptr; int64_t tenant_id = MTL_ID(); - ObDirectLoadResourceOpRes apply_res; - table::ObTableLoadArray all_leader_info_array; uint64_t cluster_version = ctx_->ddl_param_.cluster_version_; if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { LOG_INFO("fail to get tenant", KR(ret), K(tenant_id)); } else if (cluster_version < CLUSTER_VERSION_4_2_2_0 || (cluster_version >= CLUSTER_VERSION_4_3_0_0 && cluster_version < CLUSTER_VERSION_4_3_1_0)) { // not support resource manage - ctx_->param_.session_count_ = MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2); - if (ctx_->param_.need_sort_) { - ctx_->param_.session_count_ = MAX(ctx_->param_.session_count_, 2); - } - ctx_->param_.write_session_count_ = ctx_->param_.session_count_; - } else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) { - LOG_WARN("fail to get all leader info", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) { - LOG_WARN("fail to get memory_limit", K(ret)); - } else { - int64_t retry_count = 0; - common::ObAddr leader; - common::ObAddr coordinator_addr = ObServer::get_instance().get_self(); - bool include_cur_addr = false; - bool last_sort = ctx_->param_.need_sort_; - int64_t total_partitions = 0; - ObArray partitions; - ObArray min_unsort_memory; - int64_t store_server_count = all_leader_info_array.count(); - int64_t total_server_count = store_server_count; - int64_t coordinator_session_count = 0; - int64_t min_session_count = ctx_->param_.parallel_; - int64_t max_session_count = (int64_t)tenant->unit_max_cpu() * 2; - int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count); - int64_t remain_session_count = total_session_count; - partitions.set_tenant_id(MTL_ID()); - min_unsort_memory.set_tenant_id(MTL_ID()); - for (int64_t i = 0; i < store_server_count; i++) { - total_partitions += all_leader_info_array[i].partition_id_array_.count(); - if (coordinator_addr == all_leader_info_array[i].addr_) { - include_cur_addr = true; - } - } - if (!include_cur_addr) { - total_server_count++; - } - - if (OB_FAIL(apply_arg.apply_array_.reserve(total_server_count))) { - LOG_WARN("fail to reserve apply_arg.apply_array_", KR(ret)); - } else if (OB_FAIL(partitions.reserve(total_server_count))) { - LOG_WARN("fail to reserve partitions", KR(ret)); - } else if (OB_FAIL(min_unsort_memory.reserve(total_server_count))) { - LOG_WARN("fail to reserve min_unsort_memory", KR(ret)); + if (OB_FAIL(coordinator_ctx_->init_partition_location())) { + LOG_WARN("fail to init partition location", KR(ret)); } else { - apply_arg.tenant_id_ = tenant_id; - apply_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_); - // is_heap_table==true,we prioritize non multiple modes that do not require sorting - // FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel - // is_heap_table==false,we prioritize non multiple modes that do not require sorting - // GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel) - // param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK) - // MULTIPLE_HEAP_TABLE_COMPACT - // MEM_COMPACT - for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { - ObDirectLoadResourceUnit unit; - unit.addr_ = all_leader_info_array[i].addr_; - if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) { - LOG_WARN("fail to push back", KR(ret)); - } else { - unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1), - MIN(max_session_count, total_session_count * partitions[i] / total_partitions)); + ctx_->param_.session_count_ = MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2); + if (ctx_->param_.need_sort_) { + ctx_->param_.session_count_ = MAX(ctx_->param_.session_count_, 2); + } + ctx_->param_.write_session_count_ = ctx_->param_.session_count_; + } + } else { + apply_arg.tenant_id_ = tenant_id; + apply_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_); + int64_t retry_count = 0; + common::ObAddr coordinator_addr = ObServer::get_instance().get_self(); + while (OB_SUCC(ret)) { + apply_arg.apply_array_.reset(); + int64_t memory_limit = 0; + table::ObTableLoadArray all_leader_info_array; + if (THIS_WORKER.is_timeout()) { + ret = OB_TIMEOUT; + LOG_WARN("gen_apply_arg wait too long", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) { + LOG_WARN("fail to check status", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->init_partition_location())) { + LOG_WARN("fail to init partition location", KR(ret)); + } else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) { + LOG_WARN("fail to get all leader info", KR(ret)); + } else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) { + LOG_WARN("fail to get memory_limit", K(ret)); + } else { + bool include_cur_addr = false; + bool need_sort = ctx_->param_.need_sort_; + int64_t total_partitions = 0; + ObArray partitions; + int64_t store_server_count = all_leader_info_array.count(); + int64_t coordinator_session_count = 0; + int64_t min_session_count = ctx_->param_.parallel_; + int64_t max_session_count = (int64_t)tenant->unit_max_cpu() * 2; + int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count); + int64_t remain_session_count = total_session_count; + partitions.set_tenant_id(MTL_ID()); + for (int64_t i = 0; i < store_server_count; i++) { + total_partitions += all_leader_info_array[i].partition_id_array_.count(); + if (coordinator_addr == all_leader_info_array[i].addr_) { + include_cur_addr = true; + } + } + // is_heap_table==true,we prioritize non multiple modes that do not require sorting + // FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel + // is_heap_table==false,we prioritize non multiple modes that do not require sorting + // GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel) + // param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK) + // MULTIPLE_HEAP_TABLE_COMPACT + // MEM_COMPACT + for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { + ObDirectLoadResourceUnit unit; + unit.addr_ = all_leader_info_array[i].addr_; + if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) { + LOG_WARN("fail to push back", KR(ret)); + } else { + unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1), + MIN(max_session_count, total_session_count * partitions[i] / total_partitions)); + if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { + LOG_WARN("fail to push back", KR(ret)); + } else { + remain_session_count -= unit.thread_count_; + } + } + } + if (OB_SUCC(ret) && !include_cur_addr) { + ObDirectLoadResourceUnit unit; + unit.addr_ = coordinator_addr; + unit.thread_count_ = MAX(1, total_session_count / store_server_count); + unit.memory_size_ = 0; + coordinator_session_count = unit.thread_count_; + min_session_count = MIN(min_session_count, unit.thread_count_); if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { LOG_WARN("fail to push back", KR(ret)); - } else { - remain_session_count -= unit.thread_count_; } } - } - if (OB_SUCC(ret) && !include_cur_addr) { - ObDirectLoadResourceUnit unit; - unit.addr_ = coordinator_addr; - unit.thread_count_ = MAX(1, total_session_count / store_server_count); - unit.memory_size_ = 0; - coordinator_session_count = unit.thread_count_; - min_session_count = MIN(min_session_count, unit.thread_count_); - if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { - LOG_WARN("fail to push back", KR(ret)); - } - } - - if (OB_SUCC(ret)) { - while (remain_session_count > 0) { - for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) { - ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; - if (unit.thread_count_ < max_session_count) { - unit.thread_count_++; - remain_session_count--; - } - } - } - } - - for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { - ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; - if (all_leader_info_array[i].addr_ == coordinator_addr) { - coordinator_session_count = unit.thread_count_; - } - min_session_count = MIN(min_session_count, unit.thread_count_); - if (ctx_->schema_.is_heap_table_) { - if (OB_FAIL(min_unsort_memory.push_back(MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_))) { - LOG_WARN("fail to push back", KR(ret)); - } else { - unit.memory_size_ = min_unsort_memory[i]; - } - } else { - if (OB_FAIL(min_unsort_memory.push_back(MAX(SSTABLE_BUFFER_SIZE * partitions[i], MACROBLOCK_BUFFER_SIZE) * unit.thread_count_))) { - LOG_WARN("fail to push back", KR(ret)); - } else { - unit.memory_size_ = (last_sort ? MIN(MAX(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, - MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit) - : min_unsort_memory[i]); - } - } - } - - while (OB_SUCC(ret)) { - if (THIS_WORKER.is_timeout()) { - ret = OB_TIMEOUT; - LOG_WARN("gen_apply_arg wait too long", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) { - LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) { - LOG_WARN("fail to check status", KR(ret)); - } else { - if (ctx_->schema_.is_heap_table_ || !ctx_->param_.need_sort_) { - last_sort = false; - for (int64_t i = 0; !last_sort && i < store_server_count; i++) { - if (min_unsort_memory[i] > memory_limit) { - last_sort = true; + if (OB_SUCC(ret)) { + while (remain_session_count > 0) { + for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) { + ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; + if (unit.thread_count_ < max_session_count) { + unit.thread_count_++; + remain_session_count--; } } - for (int64_t i = 0; i < store_server_count; i++) { - ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; - unit.thread_count_ = MAX((last_sort ? 2 : 1), unit.thread_count_); - unit.memory_size_ = (last_sort ? MIN(MAX(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, - MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit) - : min_unsort_memory[i]); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) { + ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i]; + if (all_leader_info_array[i].addr_ == coordinator_addr) { + coordinator_session_count = unit.thread_count_; + } + min_session_count = MIN(min_session_count, unit.thread_count_); + int64_t min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_; + if (ctx_->schema_.is_heap_table_) { + if (min_unsort_memory <= memory_limit) { + need_sort = false; + unit.memory_size_ = min_unsort_memory; + } else { + need_sort = true; + unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit); + } + } else { + if (need_sort) { + unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit); + } else { + unit.memory_size_ = MIN(min_unsort_memory, memory_limit); } } + } + + if (OB_SUCC(ret)) { + ObDirectLoadResourceOpRes apply_res; if (OB_FAIL(ObTableLoadResourceService::apply_resource(apply_arg, apply_res))) { if (retry_count % 100 == 0) { LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count)); @@ -410,26 +387,24 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar if (ret == OB_EAGAIN) { retry_count++; ret = OB_SUCCESS; + usleep(RESOURCE_OP_WAIT_INTERVAL_US); } } else { - ctx_->param_.need_sort_ = last_sort; + ctx_->param_.need_sort_ = need_sort; ctx_->param_.session_count_ = coordinator_session_count; - ctx_->param_.write_session_count_ = (include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2) - : min_session_count); - ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? (last_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT - : ObTableLoadExeMode::FAST_HEAP_TABLE) - : (last_sort ? ObTableLoadExeMode::MEM_COMPACT - : ObTableLoadExeMode::GENERAL_TABLE_COMPACT)); - + ctx_->param_.write_session_count_ = + (include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2) : min_session_count); + ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? + (need_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT : ObTableLoadExeMode::FAST_HEAP_TABLE) : + (need_sort ? ObTableLoadExeMode::MEM_COMPACT : ObTableLoadExeMode::GENERAL_TABLE_COMPACT)); if (OB_FAIL(ObTableLoadService::add_assigned_task(apply_arg))) { LOG_WARN("fail to add_assigned_task", KR(ret)); } else { ctx_->set_assigned_resource(); - LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(leader), K(coordinator_addr), K(apply_arg)); + LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(coordinator_addr), K(apply_arg)); break; } } - usleep(RESOURCE_OP_WAIT_INTERVAL_US); } } } diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index 6a32d58b2..840e038a7 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -62,12 +62,12 @@ int ObTableLoadCoordinatorCtx::init_partition_location() int retry = 0; bool flag = false; while (retry < 3 && OB_SUCC(ret)) { + partition_location_.reset(); + target_partition_location_.reset(); // init partition_location_ - if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_, - allocator_))) { + if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_))) { LOG_WARN("fail to init partition location", KR(ret)); - } else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_, - target_schema_.partition_ids_, allocator_))) { + } else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_, target_schema_.partition_ids_))) { LOG_WARN("fail to init origin partition location", KR(ret)); } else if (OB_FAIL(partition_location_.check_tablet_has_same_leader(target_partition_location_, flag))) { LOG_WARN("fail to check_tablet_has_same_leader", KR(ret)); @@ -79,14 +79,12 @@ int ObTableLoadCoordinatorCtx::init_partition_location() LOG_WARN("invalid leader info, maybe change master"); } } - partition_location_.reset(); - target_partition_location_.reset(); retry ++; } if (OB_SUCC(ret)) { if (!flag) { - ret = OB_INVALID_ARGUMENT; + ret = OB_EAGAIN; LOG_WARN("invalid leader info", KR(ret)); } } @@ -116,10 +114,6 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &column_ids, else if (OB_FAIL(init_column_idxs(column_ids))) { LOG_WARN("fail to init column idxs", KR(ret), K(column_ids)); } - // init partition_location_ - else if (OB_FAIL(init_partition_location())) { - LOG_WARN("fail to init partition location", KR(ret)); - } // init partition_calc_ else if (OB_FAIL( partition_calc_.init(ctx_->param_, ctx_->session_info_))) { diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index 09bb425ba..cb4fd717b 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -114,6 +114,7 @@ public: common::ObIAllocator &allocator) const; int check_exist_trans(bool &is_exist) const; int check_exist_committed_trans(bool &is_exist) const; + int init_partition_location(); int init_complete(); private: int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx); @@ -123,7 +124,6 @@ private: int init_session_ctx_array(); int generate_autoinc_params(share::AutoincParam &autoinc_param); int init_sequence(); - int init_partition_location(); public: ObTableLoadTableCtx * const ctx_; common::ObArenaAllocator allocator_; diff --git a/src/observer/table_load/ob_table_load_partition_location.cpp b/src/observer/table_load/ob_table_load_partition_location.cpp index 8c46ab83c..2f5eb7773 100644 --- a/src/observer/table_load/ob_table_load_partition_location.cpp +++ b/src/observer/table_load/ob_table_load_partition_location.cpp @@ -207,8 +207,7 @@ int ObTableLoadPartitionLocation::fetch_tablet_handle(uint64_t tenant_id, } int ObTableLoadPartitionLocation::init( - uint64_t tenant_id, const ObTableLoadArray &partition_ids, - ObIAllocator &allocator) + uint64_t tenant_id, const ObTableLoadArray &partition_ids) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -220,9 +219,9 @@ int ObTableLoadPartitionLocation::init( } else { if (OB_FAIL(partition_map_.create(1024, "TLD_PartLoc", "TLD_PartLoc", tenant_id))) { LOG_WARN("fail to create map", KR(ret)); - } else if (OB_FAIL(init_all_partition_location(tenant_id, partition_ids, allocator))) { + } else if (OB_FAIL(init_all_partition_location(tenant_id, partition_ids))) { LOG_WARN("fail to init all partition location", KR(ret)); - } else if (OB_FAIL(init_all_leader_info(allocator))) { + } else if (OB_FAIL(init_all_leader_info())) { LOG_WARN("fail to init all leader info", KR(ret)); } else { is_inited_ = true; @@ -232,8 +231,7 @@ int ObTableLoadPartitionLocation::init( } int ObTableLoadPartitionLocation::init_all_partition_location( - uint64_t tenant_id, const ObTableLoadArray &partition_ids, - ObIAllocator &allocator) + uint64_t tenant_id, const ObTableLoadArray &partition_ids) { int ret = OB_SUCCESS; if (OB_FAIL(fetch_ls_locations(tenant_id, partition_ids))) { @@ -242,7 +240,7 @@ int ObTableLoadPartitionLocation::init_all_partition_location( return ret; } -int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) +int ObTableLoadPartitionLocation::init_all_leader_info() { int ret = OB_SUCCESS; ObArenaAllocator tmp_allocator("TLD_PL_Tmp"); @@ -298,9 +296,9 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) } // 将set中的addr存到array中 if (OB_SUCC(ret)) { - if (OB_FAIL(all_leader_addr_array_.create(addr_map.size(), allocator))) { + if (OB_FAIL(all_leader_addr_array_.create(addr_map.size(), allocator_))) { LOG_WARN("fail to create leader addr array", KR(ret)); - } else if (OB_FAIL(all_leader_info_array_.create(addr_map.size(), allocator))) { + } else if (OB_FAIL(all_leader_info_array_.create(addr_map.size(), allocator_))) { LOG_WARN("fail to create leader info array", KR(ret)); } } @@ -327,7 +325,7 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) LeaderInfo &leader_info = all_leader_info_array_[i]; leader_info.addr_ = addr; if (OB_FAIL(ObTableLoadUtils::deep_copy(*partition_id_array, leader_info.partition_id_array_, - allocator))) { + allocator_))) { LOG_WARN("fail to deep copy partition id array", KR(ret)); } partition_id_array->~ObIArray(); diff --git a/src/observer/table_load/ob_table_load_partition_location.h b/src/observer/table_load/ob_table_load_partition_location.h index a5e0679a6..847e31fdd 100644 --- a/src/observer/table_load/ob_table_load_partition_location.h +++ b/src/observer/table_load/ob_table_load_partition_location.h @@ -54,10 +54,15 @@ public: TO_STRING_KV(K_(addr), KP_(partition_id_array_ptr)); }; public: - ObTableLoadPartitionLocation() : is_inited_(false) { tablet_ids_.set_tenant_id(MTL_ID()); } + ObTableLoadPartitionLocation() + : allocator_("TLD_PL"), + is_inited_(false) + { + allocator_.set_tenant_id(MTL_ID()); + tablet_ids_.set_tenant_id(MTL_ID()); + } int init(uint64_t tenant_id, - const table::ObTableLoadArray &partition_ids, - common::ObIAllocator &allocator); + const table::ObTableLoadArray &partition_ids); int get_leader(common::ObTabletID tablet_id, PartitionLocationInfo &info) const; int get_all_leader(table::ObTableLoadArray &addr_array) const; int get_all_leader_info(table::ObTableLoadArray &info_array) const; @@ -66,6 +71,7 @@ public: partition_map_.destroy(); all_leader_addr_array_.reset(); all_leader_info_array_.reset(); + allocator_.reset(); is_inited_ = false; } int check_tablet_has_same_leader(const ObTableLoadPartitionLocation &other, bool &result); @@ -85,13 +91,13 @@ public: storage::ObTabletHandle &tablet_handle); private: int init_all_partition_location( - uint64_t tenant_id, const table::ObTableLoadArray &partition_ids, - common::ObIAllocator &allocator); - int init_all_leader_info(common::ObIAllocator &allocator); + uint64_t tenant_id, const table::ObTableLoadArray &partition_ids); + int init_all_leader_info(); int fetch_ls_locations( uint64_t tenant_id, const table::ObTableLoadArray &partition_ids); private: + common::ObArenaAllocator allocator_; common::ObArray tablet_ids_; //保证遍历partition_map_的时候顺序不变 common::hash::ObHashMap partition_map_; table::ObTableLoadArray all_leader_addr_array_; diff --git a/src/observer/table_load/resource/ob_table_load_resource_manager.cpp b/src/observer/table_load/resource/ob_table_load_resource_manager.cpp index 7ed3d7b1f..7e8564349 100644 --- a/src/observer/table_load/resource/ob_table_load_resource_manager.cpp +++ b/src/observer/table_load/resource/ob_table_load_resource_manager.cpp @@ -207,7 +207,11 @@ int ObTableLoadResourceManager::apply_resource(ObDirectLoadResourceApplyArg &arg for (int64_t i = 0; OB_SUCC(ret) && i < arg.apply_array_.count(); i++) { ObDirectLoadResourceUnit &apply_unit = arg.apply_array_[i]; if (OB_FAIL(resource_pool_.get_refactored(apply_unit.addr_, ctx))) { - LOG_WARN("fail to get refactored", K(apply_unit.addr_)); + LOG_WARN("fail to get refactored", KR(ret), K(apply_unit.addr_)); + if (ret == OB_HASH_NOT_EXIST) { + // 第一次切主需要初始化,通过内部sql查询ACTIVE状态的observer可能不完整,期间若有导入任务进来时需要重试 + ret = OB_EAGAIN; + } } else if (apply_unit.thread_count_ > ctx.thread_remain_ || apply_unit.memory_size_ > ctx.memory_remain_) { ret = OB_EAGAIN; } @@ -234,7 +238,7 @@ int ObTableLoadResourceManager::apply_resource(ObDirectLoadResourceApplyArg &arg } } } else { - LOG_WARN("fail to get refactored", K(arg.task_key_)); + LOG_WARN("fail to get refactored", KR(ret), K(arg.task_key_)); } } else { LOG_INFO("resource has been assigned", K(arg.task_key_));