Recalculate partitions before applying resource
This commit is contained in:
parent
8d9ac1a4d5
commit
50de6c3997
@ -253,156 +253,133 @@ int ObTableLoadCoordinator::init()
|
||||
int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t memory_limit = 0;
|
||||
ObTenant *tenant = nullptr;
|
||||
int64_t tenant_id = MTL_ID();
|
||||
ObDirectLoadResourceOpRes apply_res;
|
||||
table::ObTableLoadArray<ObTableLoadPartitionLocation::LeaderInfo> all_leader_info_array;
|
||||
uint64_t cluster_version = ctx_->ddl_param_.cluster_version_;
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
|
||||
LOG_INFO("fail to get tenant", KR(ret), K(tenant_id));
|
||||
} else if (cluster_version < CLUSTER_VERSION_4_2_2_0 ||
|
||||
(cluster_version >= CLUSTER_VERSION_4_3_0_0 && cluster_version < CLUSTER_VERSION_4_3_1_0)) {
|
||||
// not support resource manage
|
||||
ctx_->param_.session_count_ = MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2);
|
||||
if (ctx_->param_.need_sort_) {
|
||||
ctx_->param_.session_count_ = MAX(ctx_->param_.session_count_, 2);
|
||||
}
|
||||
ctx_->param_.write_session_count_ = ctx_->param_.session_count_;
|
||||
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) {
|
||||
LOG_WARN("fail to get all leader info", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) {
|
||||
LOG_WARN("fail to get memory_limit", K(ret));
|
||||
} else {
|
||||
int64_t retry_count = 0;
|
||||
common::ObAddr leader;
|
||||
common::ObAddr coordinator_addr = ObServer::get_instance().get_self();
|
||||
bool include_cur_addr = false;
|
||||
bool last_sort = ctx_->param_.need_sort_;
|
||||
int64_t total_partitions = 0;
|
||||
ObArray<int64_t> partitions;
|
||||
ObArray<int64_t> min_unsort_memory;
|
||||
int64_t store_server_count = all_leader_info_array.count();
|
||||
int64_t total_server_count = store_server_count;
|
||||
int64_t coordinator_session_count = 0;
|
||||
int64_t min_session_count = ctx_->param_.parallel_;
|
||||
int64_t max_session_count = (int64_t)tenant->unit_max_cpu() * 2;
|
||||
int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count);
|
||||
int64_t remain_session_count = total_session_count;
|
||||
partitions.set_tenant_id(MTL_ID());
|
||||
min_unsort_memory.set_tenant_id(MTL_ID());
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
total_partitions += all_leader_info_array[i].partition_id_array_.count();
|
||||
if (coordinator_addr == all_leader_info_array[i].addr_) {
|
||||
include_cur_addr = true;
|
||||
}
|
||||
}
|
||||
if (!include_cur_addr) {
|
||||
total_server_count++;
|
||||
}
|
||||
|
||||
if (OB_FAIL(apply_arg.apply_array_.reserve(total_server_count))) {
|
||||
LOG_WARN("fail to reserve apply_arg.apply_array_", KR(ret));
|
||||
} else if (OB_FAIL(partitions.reserve(total_server_count))) {
|
||||
LOG_WARN("fail to reserve partitions", KR(ret));
|
||||
} else if (OB_FAIL(min_unsort_memory.reserve(total_server_count))) {
|
||||
LOG_WARN("fail to reserve min_unsort_memory", KR(ret));
|
||||
if (OB_FAIL(coordinator_ctx_->init_partition_location())) {
|
||||
LOG_WARN("fail to init partition location", KR(ret));
|
||||
} else {
|
||||
apply_arg.tenant_id_ = tenant_id;
|
||||
apply_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_);
|
||||
// is_heap_table==true,we prioritize non multiple modes that do not require sorting
|
||||
// FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel
|
||||
// is_heap_table==false,we prioritize non multiple modes that do not require sorting
|
||||
// GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel)
|
||||
// param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK)
|
||||
// MULTIPLE_HEAP_TABLE_COMPACT
|
||||
// MEM_COMPACT
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = all_leader_info_array[i].addr_;
|
||||
if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1),
|
||||
MIN(max_session_count, total_session_count * partitions[i] / total_partitions));
|
||||
ctx_->param_.session_count_ = MIN(ctx_->param_.parallel_, (int64_t)tenant->unit_max_cpu() * 2);
|
||||
if (ctx_->param_.need_sort_) {
|
||||
ctx_->param_.session_count_ = MAX(ctx_->param_.session_count_, 2);
|
||||
}
|
||||
ctx_->param_.write_session_count_ = ctx_->param_.session_count_;
|
||||
}
|
||||
} else {
|
||||
apply_arg.tenant_id_ = tenant_id;
|
||||
apply_arg.task_key_ = ObTableLoadUniqueKey(ctx_->param_.table_id_, ctx_->ddl_param_.task_id_);
|
||||
int64_t retry_count = 0;
|
||||
common::ObAddr coordinator_addr = ObServer::get_instance().get_self();
|
||||
while (OB_SUCC(ret)) {
|
||||
apply_arg.apply_array_.reset();
|
||||
int64_t memory_limit = 0;
|
||||
table::ObTableLoadArray<ObTableLoadPartitionLocation::LeaderInfo> all_leader_info_array;
|
||||
if (THIS_WORKER.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("gen_apply_arg wait too long", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->init_partition_location())) {
|
||||
LOG_WARN("fail to init partition location", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->partition_location_.get_all_leader_info(all_leader_info_array))) {
|
||||
LOG_WARN("fail to get all leader info", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadService::get_memory_limit(memory_limit))) {
|
||||
LOG_WARN("fail to get memory_limit", K(ret));
|
||||
} else {
|
||||
bool include_cur_addr = false;
|
||||
bool need_sort = ctx_->param_.need_sort_;
|
||||
int64_t total_partitions = 0;
|
||||
ObArray<int64_t> partitions;
|
||||
int64_t store_server_count = all_leader_info_array.count();
|
||||
int64_t coordinator_session_count = 0;
|
||||
int64_t min_session_count = ctx_->param_.parallel_;
|
||||
int64_t max_session_count = (int64_t)tenant->unit_max_cpu() * 2;
|
||||
int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count);
|
||||
int64_t remain_session_count = total_session_count;
|
||||
partitions.set_tenant_id(MTL_ID());
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
total_partitions += all_leader_info_array[i].partition_id_array_.count();
|
||||
if (coordinator_addr == all_leader_info_array[i].addr_) {
|
||||
include_cur_addr = true;
|
||||
}
|
||||
}
|
||||
// is_heap_table==true,we prioritize non multiple modes that do not require sorting
|
||||
// FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel
|
||||
// is_heap_table==false,we prioritize non multiple modes that do not require sorting
|
||||
// GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel)
|
||||
// param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK)
|
||||
// MULTIPLE_HEAP_TABLE_COMPACT
|
||||
// MEM_COMPACT
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = all_leader_info_array[i].addr_;
|
||||
if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1),
|
||||
MIN(max_session_count, total_session_count * partitions[i] / total_partitions));
|
||||
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
remain_session_count -= unit.thread_count_;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !include_cur_addr) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = coordinator_addr;
|
||||
unit.thread_count_ = MAX(1, total_session_count / store_server_count);
|
||||
unit.memory_size_ = 0;
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
remain_session_count -= unit.thread_count_;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !include_cur_addr) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = coordinator_addr;
|
||||
unit.thread_count_ = MAX(1, total_session_count / store_server_count);
|
||||
unit.memory_size_ = 0;
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
while (remain_session_count > 0) {
|
||||
for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
if (unit.thread_count_ < max_session_count) {
|
||||
unit.thread_count_++;
|
||||
remain_session_count--;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
if (all_leader_info_array[i].addr_ == coordinator_addr) {
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
}
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
if (ctx_->schema_.is_heap_table_) {
|
||||
if (OB_FAIL(min_unsort_memory.push_back(MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
unit.memory_size_ = min_unsort_memory[i];
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(min_unsort_memory.push_back(MAX(SSTABLE_BUFFER_SIZE * partitions[i], MACROBLOCK_BUFFER_SIZE) * unit.thread_count_))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
} else {
|
||||
unit.memory_size_ = (last_sort ? MIN(MAX(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK,
|
||||
MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit)
|
||||
: min_unsort_memory[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret)) {
|
||||
if (THIS_WORKER.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("gen_apply_arg wait too long", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->check_status(ObTableLoadStatusType::INITED))) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(coordinator_ctx_->exec_ctx_->check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else {
|
||||
if (ctx_->schema_.is_heap_table_ || !ctx_->param_.need_sort_) {
|
||||
last_sort = false;
|
||||
for (int64_t i = 0; !last_sort && i < store_server_count; i++) {
|
||||
if (min_unsort_memory[i] > memory_limit) {
|
||||
last_sort = true;
|
||||
if (OB_SUCC(ret)) {
|
||||
while (remain_session_count > 0) {
|
||||
for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
if (unit.thread_count_ < max_session_count) {
|
||||
unit.thread_count_++;
|
||||
remain_session_count--;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
unit.thread_count_ = MAX((last_sort ? 2 : 1), unit.thread_count_);
|
||||
unit.memory_size_ = (last_sort ? MIN(MAX(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK,
|
||||
MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit)
|
||||
: min_unsort_memory[i]);
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
if (all_leader_info_array[i].addr_ == coordinator_addr) {
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
}
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
int64_t min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_;
|
||||
if (ctx_->schema_.is_heap_table_) {
|
||||
if (min_unsort_memory <= memory_limit) {
|
||||
need_sort = false;
|
||||
unit.memory_size_ = min_unsort_memory;
|
||||
} else {
|
||||
need_sort = true;
|
||||
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||
}
|
||||
} else {
|
||||
if (need_sort) {
|
||||
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||
} else {
|
||||
unit.memory_size_ = MIN(min_unsort_memory, memory_limit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDirectLoadResourceOpRes apply_res;
|
||||
if (OB_FAIL(ObTableLoadResourceService::apply_resource(apply_arg, apply_res))) {
|
||||
if (retry_count % 100 == 0) {
|
||||
LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count));
|
||||
@ -410,26 +387,24 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
if (ret == OB_EAGAIN) {
|
||||
retry_count++;
|
||||
ret = OB_SUCCESS;
|
||||
usleep(RESOURCE_OP_WAIT_INTERVAL_US);
|
||||
}
|
||||
} else {
|
||||
ctx_->param_.need_sort_ = last_sort;
|
||||
ctx_->param_.need_sort_ = need_sort;
|
||||
ctx_->param_.session_count_ = coordinator_session_count;
|
||||
ctx_->param_.write_session_count_ = (include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2)
|
||||
: min_session_count);
|
||||
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ? (last_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT
|
||||
: ObTableLoadExeMode::FAST_HEAP_TABLE)
|
||||
: (last_sort ? ObTableLoadExeMode::MEM_COMPACT
|
||||
: ObTableLoadExeMode::GENERAL_TABLE_COMPACT));
|
||||
|
||||
ctx_->param_.write_session_count_ =
|
||||
(include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2) : min_session_count);
|
||||
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ?
|
||||
(need_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT : ObTableLoadExeMode::FAST_HEAP_TABLE) :
|
||||
(need_sort ? ObTableLoadExeMode::MEM_COMPACT : ObTableLoadExeMode::GENERAL_TABLE_COMPACT));
|
||||
if (OB_FAIL(ObTableLoadService::add_assigned_task(apply_arg))) {
|
||||
LOG_WARN("fail to add_assigned_task", KR(ret));
|
||||
} else {
|
||||
ctx_->set_assigned_resource();
|
||||
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(leader), K(coordinator_addr), K(apply_arg));
|
||||
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(coordinator_addr), K(apply_arg));
|
||||
break;
|
||||
}
|
||||
}
|
||||
usleep(RESOURCE_OP_WAIT_INTERVAL_US);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -62,12 +62,12 @@ int ObTableLoadCoordinatorCtx::init_partition_location()
|
||||
int retry = 0;
|
||||
bool flag = false;
|
||||
while (retry < 3 && OB_SUCC(ret)) {
|
||||
partition_location_.reset();
|
||||
target_partition_location_.reset();
|
||||
// init partition_location_
|
||||
if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_,
|
||||
allocator_))) {
|
||||
if (OB_FAIL(partition_location_.init(ctx_->param_.tenant_id_, ctx_->schema_.partition_ids_))) {
|
||||
LOG_WARN("fail to init partition location", KR(ret));
|
||||
} else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_,
|
||||
target_schema_.partition_ids_, allocator_))) {
|
||||
} else if (OB_FAIL(target_partition_location_.init(ctx_->param_.tenant_id_, target_schema_.partition_ids_))) {
|
||||
LOG_WARN("fail to init origin partition location", KR(ret));
|
||||
} else if (OB_FAIL(partition_location_.check_tablet_has_same_leader(target_partition_location_, flag))) {
|
||||
LOG_WARN("fail to check_tablet_has_same_leader", KR(ret));
|
||||
@ -79,14 +79,12 @@ int ObTableLoadCoordinatorCtx::init_partition_location()
|
||||
LOG_WARN("invalid leader info, maybe change master");
|
||||
}
|
||||
}
|
||||
partition_location_.reset();
|
||||
target_partition_location_.reset();
|
||||
retry ++;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!flag) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("invalid leader info", KR(ret));
|
||||
}
|
||||
}
|
||||
@ -116,10 +114,6 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<uint64_t> &column_ids,
|
||||
else if (OB_FAIL(init_column_idxs(column_ids))) {
|
||||
LOG_WARN("fail to init column idxs", KR(ret), K(column_ids));
|
||||
}
|
||||
// init partition_location_
|
||||
else if (OB_FAIL(init_partition_location())) {
|
||||
LOG_WARN("fail to init partition location", KR(ret));
|
||||
}
|
||||
// init partition_calc_
|
||||
else if (OB_FAIL(
|
||||
partition_calc_.init(ctx_->param_, ctx_->session_info_))) {
|
||||
|
@ -114,6 +114,7 @@ public:
|
||||
common::ObIAllocator &allocator) const;
|
||||
int check_exist_trans(bool &is_exist) const;
|
||||
int check_exist_committed_trans(bool &is_exist) const;
|
||||
int init_partition_location();
|
||||
int init_complete();
|
||||
private:
|
||||
int alloc_trans_ctx(const table::ObTableLoadTransId &trans_id, ObTableLoadTransCtx *&trans_ctx);
|
||||
@ -123,7 +124,6 @@ private:
|
||||
int init_session_ctx_array();
|
||||
int generate_autoinc_params(share::AutoincParam &autoinc_param);
|
||||
int init_sequence();
|
||||
int init_partition_location();
|
||||
public:
|
||||
ObTableLoadTableCtx * const ctx_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
|
@ -207,8 +207,7 @@ int ObTableLoadPartitionLocation::fetch_tablet_handle(uint64_t tenant_id,
|
||||
}
|
||||
|
||||
int ObTableLoadPartitionLocation::init(
|
||||
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids,
|
||||
ObIAllocator &allocator)
|
||||
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
@ -220,9 +219,9 @@ int ObTableLoadPartitionLocation::init(
|
||||
} else {
|
||||
if (OB_FAIL(partition_map_.create(1024, "TLD_PartLoc", "TLD_PartLoc", tenant_id))) {
|
||||
LOG_WARN("fail to create map", KR(ret));
|
||||
} else if (OB_FAIL(init_all_partition_location(tenant_id, partition_ids, allocator))) {
|
||||
} else if (OB_FAIL(init_all_partition_location(tenant_id, partition_ids))) {
|
||||
LOG_WARN("fail to init all partition location", KR(ret));
|
||||
} else if (OB_FAIL(init_all_leader_info(allocator))) {
|
||||
} else if (OB_FAIL(init_all_leader_info())) {
|
||||
LOG_WARN("fail to init all leader info", KR(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
@ -232,8 +231,7 @@ int ObTableLoadPartitionLocation::init(
|
||||
}
|
||||
|
||||
int ObTableLoadPartitionLocation::init_all_partition_location(
|
||||
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids,
|
||||
ObIAllocator &allocator)
|
||||
uint64_t tenant_id, const ObTableLoadArray<ObTableLoadPartitionId> &partition_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(fetch_ls_locations(tenant_id, partition_ids))) {
|
||||
@ -242,7 +240,7 @@ int ObTableLoadPartitionLocation::init_all_partition_location(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator)
|
||||
int ObTableLoadPartitionLocation::init_all_leader_info()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator tmp_allocator("TLD_PL_Tmp");
|
||||
@ -298,9 +296,9 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator)
|
||||
}
|
||||
// 将set中的addr存到array中
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(all_leader_addr_array_.create(addr_map.size(), allocator))) {
|
||||
if (OB_FAIL(all_leader_addr_array_.create(addr_map.size(), allocator_))) {
|
||||
LOG_WARN("fail to create leader addr array", KR(ret));
|
||||
} else if (OB_FAIL(all_leader_info_array_.create(addr_map.size(), allocator))) {
|
||||
} else if (OB_FAIL(all_leader_info_array_.create(addr_map.size(), allocator_))) {
|
||||
LOG_WARN("fail to create leader info array", KR(ret));
|
||||
}
|
||||
}
|
||||
@ -327,7 +325,7 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator)
|
||||
LeaderInfo &leader_info = all_leader_info_array_[i];
|
||||
leader_info.addr_ = addr;
|
||||
if (OB_FAIL(ObTableLoadUtils::deep_copy(*partition_id_array, leader_info.partition_id_array_,
|
||||
allocator))) {
|
||||
allocator_))) {
|
||||
LOG_WARN("fail to deep copy partition id array", KR(ret));
|
||||
}
|
||||
partition_id_array->~ObIArray<ObTableLoadLSIdAndPartitionId>();
|
||||
|
@ -54,10 +54,15 @@ public:
|
||||
TO_STRING_KV(K_(addr), KP_(partition_id_array_ptr));
|
||||
};
|
||||
public:
|
||||
ObTableLoadPartitionLocation() : is_inited_(false) { tablet_ids_.set_tenant_id(MTL_ID()); }
|
||||
ObTableLoadPartitionLocation()
|
||||
: allocator_("TLD_PL"),
|
||||
is_inited_(false)
|
||||
{
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
tablet_ids_.set_tenant_id(MTL_ID());
|
||||
}
|
||||
int init(uint64_t tenant_id,
|
||||
const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids,
|
||||
common::ObIAllocator &allocator);
|
||||
const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
|
||||
int get_leader(common::ObTabletID tablet_id, PartitionLocationInfo &info) const;
|
||||
int get_all_leader(table::ObTableLoadArray<common::ObAddr> &addr_array) const;
|
||||
int get_all_leader_info(table::ObTableLoadArray<LeaderInfo> &info_array) const;
|
||||
@ -66,6 +71,7 @@ public:
|
||||
partition_map_.destroy();
|
||||
all_leader_addr_array_.reset();
|
||||
all_leader_info_array_.reset();
|
||||
allocator_.reset();
|
||||
is_inited_ = false;
|
||||
}
|
||||
int check_tablet_has_same_leader(const ObTableLoadPartitionLocation &other, bool &result);
|
||||
@ -85,13 +91,13 @@ public:
|
||||
storage::ObTabletHandle &tablet_handle);
|
||||
private:
|
||||
int init_all_partition_location(
|
||||
uint64_t tenant_id, const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids,
|
||||
common::ObIAllocator &allocator);
|
||||
int init_all_leader_info(common::ObIAllocator &allocator);
|
||||
uint64_t tenant_id, const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
|
||||
int init_all_leader_info();
|
||||
int fetch_ls_locations(
|
||||
uint64_t tenant_id,
|
||||
const table::ObTableLoadArray<table::ObTableLoadPartitionId> &partition_ids);
|
||||
private:
|
||||
common::ObArenaAllocator allocator_;
|
||||
common::ObArray<common::ObTabletID> tablet_ids_; //保证遍历partition_map_的时候顺序不变
|
||||
common::hash::ObHashMap<common::ObTabletID, PartitionLocationInfo> partition_map_;
|
||||
table::ObTableLoadArray<common::ObAddr> all_leader_addr_array_;
|
||||
|
@ -207,7 +207,11 @@ int ObTableLoadResourceManager::apply_resource(ObDirectLoadResourceApplyArg &arg
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.apply_array_.count(); i++) {
|
||||
ObDirectLoadResourceUnit &apply_unit = arg.apply_array_[i];
|
||||
if (OB_FAIL(resource_pool_.get_refactored(apply_unit.addr_, ctx))) {
|
||||
LOG_WARN("fail to get refactored", K(apply_unit.addr_));
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(apply_unit.addr_));
|
||||
if (ret == OB_HASH_NOT_EXIST) {
|
||||
// 第一次切主需要初始化,通过内部sql查询ACTIVE状态的observer可能不完整,期间若有导入任务进来时需要重试
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
} else if (apply_unit.thread_count_ > ctx.thread_remain_ || apply_unit.memory_size_ > ctx.memory_remain_) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
@ -234,7 +238,7 @@ int ObTableLoadResourceManager::apply_resource(ObDirectLoadResourceApplyArg &arg
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("fail to get refactored", K(arg.task_key_));
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(arg.task_key_));
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("resource has been assigned", K(arg.task_key_));
|
||||
|
Loading…
x
Reference in New Issue
Block a user