fix bug of alloc memory
This commit is contained in:
parent
d4f4b28921
commit
f4c3e7513b
@ -60,6 +60,7 @@ int ObDirectLoadControlPreBeginExecutor::process()
|
||||
param.max_error_row_count_ = arg_.config_.max_error_row_count_;
|
||||
param.column_count_ = arg_.column_count_;
|
||||
param.need_sort_ = arg_.config_.is_need_sort_;
|
||||
param.task_need_sort_ = arg_.config_.is_task_need_sort_;
|
||||
param.px_mode_ = arg_.px_mode_;
|
||||
param.online_opt_stat_gather_ = arg_.online_opt_stat_gather_;
|
||||
param.dup_action_ = arg_.dup_action_;
|
||||
|
@ -252,6 +252,30 @@ int ObTableLoadCoordinator::init()
|
||||
/**
|
||||
* begin
|
||||
*/
|
||||
int ObTableLoadCoordinator::check_need_sort_for_lob_or_index(bool &need_sort) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
need_sort = false;
|
||||
if (ObDirectLoadMethod::is_incremental(ctx_->param_.method_)) {
|
||||
need_sort = ctx_->schema_.lob_column_idxs_.count() > 0;
|
||||
if (!need_sort) {
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const share::schema::ObTableSchema *data_table_schema = nullptr;
|
||||
if (OB_FAIL(ObTableLoadSchema::get_schema_guard(ctx_->param_.tenant_id_, schema_guard))) {
|
||||
LOG_WARN("fail to get schema guard", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(schema_guard, ctx_->param_.tenant_id_, ctx_->param_.table_id_, data_table_schema))) {
|
||||
LOG_WARN("fail to get table shema of main table", KR(ret));
|
||||
} else if (OB_ISNULL(data_table_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("data table schema is null", KR(ret));
|
||||
} else {
|
||||
need_sort = data_table_schema->get_simple_index_infos().count() > 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -294,8 +318,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
LOG_WARN("fail to get memory_limit", K(ret));
|
||||
} else {
|
||||
bool include_cur_addr = false;
|
||||
bool need_sort = ctx_->param_.need_sort_;
|
||||
bool main_need_sort = ctx_->param_.need_sort_;
|
||||
bool task_need_sort = false; // 表示整个导入任务是否会走排序的流程
|
||||
bool main_need_sort = false; // 表示主表是否会走排序
|
||||
int64_t total_partitions = 0;
|
||||
ObArray<int64_t> partitions;
|
||||
int64_t store_server_count = all_leader_info_array.count();
|
||||
@ -306,19 +330,16 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count);
|
||||
int64_t remain_session_count = total_session_count;
|
||||
partitions.set_tenant_id(MTL_ID());
|
||||
|
||||
// 判断coordinator节点是否也作为store节点
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
total_partitions += all_leader_info_array[i].partition_id_array_.count();
|
||||
if (coordinator_addr == all_leader_info_array[i].addr_) {
|
||||
include_cur_addr = true;
|
||||
}
|
||||
}
|
||||
// is_heap_table==true,we prioritize non multiple modes that do not require sorting
|
||||
// FAST_HEAP_TABLE: macroblock_buffer * partition_count * parallel
|
||||
// is_heap_table==false,we prioritize non multiple modes that do not require sorting
|
||||
// GENERAL_TABLE_COMPACT:max(sstable_buffer * partition_count * parallel,macroblock_buffer * parallel)
|
||||
// param_.need_sort_==true,we apply for the minimum required memory(MIN_SORT_MEMORY_PER_TASK)
|
||||
// MULTIPLE_HEAP_TABLE_COMPACT
|
||||
// MEM_COMPACT
|
||||
|
||||
// 资源控制先确定线程,第一次遍历先按分区等比例分配线程
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = all_leader_info_array[i].addr_;
|
||||
@ -333,18 +354,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
}
|
||||
}
|
||||
}
|
||||
// 协调节点不存在数据分区时,需要申请线程资源,但不需要申请内存,为零
|
||||
if (OB_SUCC(ret) && !include_cur_addr) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = coordinator_addr;
|
||||
unit.thread_count_ = MAX(total_session_count / store_server_count, MIN_THREAD_COUNT);
|
||||
unit.memory_size_ = 0;
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// 第一次遍历如果不能分配完所有的线程,继续给每个节点平均分配剩余的线程,直到所有的线程都被分配完
|
||||
if (OB_SUCC(ret)) {
|
||||
while (remain_session_count > 0) {
|
||||
for (int64_t i = 0; remain_session_count > 0 && i < store_server_count; i++) {
|
||||
@ -355,72 +366,110 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
}
|
||||
|
||||
/*
|
||||
协调节点不存在数据分区时,需要申请线程资源,但不需要申请内存,放在申请资源数组的最后一个位置
|
||||
coordinator_session_count表示coordinator节点可用线程数,min_session_count表示所有节点可用线程数的最小值
|
||||
*/
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!include_cur_addr) {
|
||||
ObDirectLoadResourceUnit unit;
|
||||
unit.addr_ = coordinator_addr;
|
||||
unit.thread_count_ = MAX(MIN(max_session_count, total_session_count / store_server_count), MIN_THREAD_COUNT);
|
||||
unit.memory_size_ = 0;
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
if (all_leader_info_array[i].addr_ == coordinator_addr) {
|
||||
coordinator_session_count = unit.thread_count_;
|
||||
}
|
||||
min_session_count = MIN(min_session_count, unit.thread_count_);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
确定write_session_count,表示发送数据阶段store节点可用的线程数
|
||||
对于load data模式,如果 协调节点和数据节点都在同一个节点上,就分出一半的线程用于解析数据,一半的线程用于存储数据
|
||||
*/
|
||||
if (OB_SUCC(ret)) {
|
||||
if (include_cur_addr && ctx_->param_.load_mode_ == ObDirectLoadMode::LOAD_DATA) {
|
||||
// 协调节点和数据节点都在同一个节点上,对于load data模式,分出一半的线程用于解析数据,一半的线程用于存储数据
|
||||
write_session_count = MIN(min_session_count, (coordinator_session_count + 1) / 2);
|
||||
} else {
|
||||
write_session_count = min_session_count;
|
||||
}
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
}
|
||||
|
||||
/*
|
||||
先确定主表是否要走排序,对于堆表,sql指定need_sort=true时,如果内存满足不排序,就走不排序流程,只要有一个节点内存不足,整体都要走排序
|
||||
如果主表要走排序,则整个任务按排序方式分配内存,否则再确定是否要做lob_id排序和索引排序,需要的话就按照MAX(主表不排序内存,索引排序内存)来分配内存,否则分配主表不排序需要的内存
|
||||
*/
|
||||
if (OB_SUCC(ret)) {
|
||||
for (int64_t i = 0; !main_need_sort && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
int64_t min_sort_memory = unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4;
|
||||
int64_t min_unsort_memory = 0;
|
||||
if (ctx_->schema_.is_heap_table_) {
|
||||
// 直接写宏块需要的内存,对于非排序模式,每个分区各自写宏块,所以要乘分区数
|
||||
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count;
|
||||
if (min_unsort_memory <= memory_limit) {
|
||||
main_need_sort = false;
|
||||
unit.memory_size_ = min_unsort_memory;
|
||||
if (!ctx_->param_.need_sort_) {
|
||||
// sql指定need_sort=false,强制不排序
|
||||
} else {
|
||||
main_need_sort = ctx_->param_.need_sort_; // allow forced non-sorting
|
||||
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||
if (min_unsort_memory > memory_limit) {
|
||||
main_need_sort = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 取写宏块或写临时文件需要内存的最小值,对于非排序模式,每个分区各自写临时文件,所以要乘分区数
|
||||
min_unsort_memory = SSTABLE_BUFFER_SIZE * partitions[i] * unit.thread_count_;
|
||||
if (main_need_sort) {
|
||||
unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit);
|
||||
if (ctx_->param_.need_sort_) {
|
||||
// sql指定need_sort=true,强制走排序
|
||||
main_need_sort = true;
|
||||
} else {
|
||||
// hint指定不排序,如果不排序内存大于内存上限,要改成走排序模式,一般是分区数较大的场景
|
||||
if (min_unsort_memory < memory_limit) {
|
||||
unit.memory_size_ = MAX(min_unsort_memory, MACROBLOCK_BUFFER_SIZE * write_session_count);
|
||||
} else {
|
||||
if (min_unsort_memory > memory_limit) {
|
||||
main_need_sort = true;
|
||||
unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (main_need_sort) {
|
||||
break;
|
||||
task_need_sort = main_need_sort;
|
||||
if (!task_need_sort) {
|
||||
if (OB_FAIL(check_need_sort_for_lob_or_index(task_need_sort))) {
|
||||
LOG_WARN("fail to check need sort for lob or index", KR(ret));
|
||||
}
|
||||
}
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTableSchema *table_schema = nullptr;
|
||||
if (OB_FAIL(ObTableLoadSchema::get_table_schema(tenant_id, ctx_->ddl_param_.dest_table_id_, schema_guard, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(ctx_->ddl_param_.dest_table_id_));
|
||||
} else if (main_need_sort || (ObDirectLoadMethod::is_incremental(ctx_->param_.method_) &&
|
||||
table_schema->get_simple_index_infos().count() > 0)) {
|
||||
need_sort = true;
|
||||
for (int64_t i = 0; i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
unit.memory_size_ = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
|
||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||
int64_t min_sort_memory = MIN(unit.thread_count_ * ObDirectLoadExternalMultiPartitionRowChunk::MIN_MEMORY_LIMIT * 4, memory_limit);
|
||||
int64_t min_unsort_memory = 0;
|
||||
if (ctx_->schema_.is_heap_table_) {
|
||||
min_unsort_memory = MIN(MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count, memory_limit);
|
||||
} else {
|
||||
min_unsort_memory = MIN(MAX(SSTABLE_BUFFER_SIZE * partitions[i] * unit.thread_count_, MACROBLOCK_BUFFER_SIZE * unit.thread_count_), memory_limit);
|
||||
}
|
||||
if (task_need_sort) {
|
||||
if (main_need_sort) {
|
||||
unit.memory_size_ = min_sort_memory;
|
||||
} else {
|
||||
unit.memory_size_ = MAX(min_unsort_memory, min_sort_memory);
|
||||
}
|
||||
} else {
|
||||
unit.memory_size_ = min_unsort_memory;
|
||||
}
|
||||
} else {
|
||||
need_sort = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ObDirectLoadResourceOpRes apply_res;
|
||||
if (OB_FAIL(ObTableLoadResourceService::apply_resource(apply_arg, apply_res))) {
|
||||
if (retry_count % 100 == 0) {
|
||||
LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count));
|
||||
LOG_WARN("fail to apply resource", KR(ret), K(apply_res.error_code_), K(retry_count), K(param_.exe_mode_), K(main_need_sort), K(task_need_sort), K(partitions), K(coordinator_addr), K(apply_arg));
|
||||
}
|
||||
if (ret == OB_EAGAIN) {
|
||||
retry_count++;
|
||||
@ -428,7 +477,8 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
usleep(RESOURCE_OP_WAIT_INTERVAL_US);
|
||||
}
|
||||
} else {
|
||||
ctx_->param_.need_sort_ = need_sort;
|
||||
ctx_->param_.need_sort_ = main_need_sort;
|
||||
ctx_->param_.task_need_sort_ = task_need_sort;
|
||||
ctx_->param_.session_count_ = coordinator_session_count;
|
||||
ctx_->param_.write_session_count_ = write_session_count;
|
||||
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ?
|
||||
@ -439,7 +489,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
||||
LOG_WARN("fail to add_assigned_task", KR(ret));
|
||||
} else {
|
||||
ctx_->set_assigned_resource();
|
||||
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(partitions), K(coordinator_addr), K(apply_arg));
|
||||
LOG_INFO("Coordinator::gen_apply_arg", K(retry_count), K(param_.exe_mode_), K(main_need_sort), K(task_need_sort), K(partitions), K(coordinator_addr), K(apply_arg));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -473,6 +523,7 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
|
||||
arg.config_.max_error_row_count_ = param_.max_error_row_count_;
|
||||
arg.config_.batch_size_ = param_.batch_size_;
|
||||
arg.config_.is_need_sort_ = param_.need_sort_;
|
||||
arg.config_.is_task_need_sort_ = param_.task_need_sort_;
|
||||
arg.column_count_ = param_.column_count_;
|
||||
arg.dup_action_ = param_.dup_action_;
|
||||
arg.px_mode_ = param_.px_mode_;
|
||||
|
@ -65,6 +65,7 @@ public:
|
||||
int get_status(table::ObTableLoadStatusType &status, int &error_code);
|
||||
int heart_beat();
|
||||
private:
|
||||
int check_need_sort_for_lob_or_index(bool &need_sort) const;
|
||||
int gen_apply_arg(ObDirectLoadResourceApplyArg &apply_arg);
|
||||
int pre_begin_peers(ObDirectLoadResourceApplyArg &apply_arg);
|
||||
int confirm_begin_peers();
|
||||
|
@ -821,7 +821,7 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx)
|
||||
LOG_WARN("fail to remove_table_ctx", KR(ret), K(release_arg.task_key_));
|
||||
} else {
|
||||
if (table_ctx->is_assigned_memory()) {
|
||||
if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.need_sort_, table_ctx->param_.avail_memory_))) {
|
||||
if (OB_TMP_FAIL(service->assigned_memory_manager_.recycle_memory(table_ctx->param_.task_need_sort_, table_ctx->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle_memory", KR(tmp_ret), K(release_arg.task_key_));
|
||||
}
|
||||
table_ctx->reset_assigned_memory();
|
||||
|
@ -172,7 +172,7 @@ int ObTableLoadStore::pre_begin()
|
||||
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
|
||||
} else {
|
||||
LOG_INFO("store pre begin");
|
||||
if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) {
|
||||
if (OB_FAIL(ObTableLoadService::assign_memory(ctx_->param_.task_need_sort_, ctx_->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to assign_memory", KR(ret));
|
||||
} else {
|
||||
ctx_->set_assigned_memory();
|
||||
@ -500,7 +500,7 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info,
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (ctx_->is_assigned_memory()) {
|
||||
if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.need_sort_, ctx_->param_.avail_memory_))) {
|
||||
if (OB_TMP_FAIL(ObTableLoadService::recycle_memory(ctx_->param_.task_need_sort_, ctx_->param_.avail_memory_))) {
|
||||
LOG_WARN("fail to recycle memory", KR(tmp_ret));
|
||||
}
|
||||
ctx_->reset_assigned_memory();
|
||||
|
@ -116,7 +116,8 @@ public:
|
||||
load_mode_(storage::ObDirectLoadMode::INVALID_MODE),
|
||||
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
|
||||
online_sample_percent_(1.),
|
||||
load_level_(storage::ObDirectLoadLevel::INVALID_LEVEL)
|
||||
load_level_(storage::ObDirectLoadLevel::INVALID_LEVEL),
|
||||
task_need_sort_(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -176,7 +177,8 @@ public:
|
||||
"direct_load_mode", storage::ObDirectLoadMode::get_type_string(load_mode_),
|
||||
K_(compressor_type),
|
||||
K_(online_sample_percent),
|
||||
"direct_load_level", storage::ObDirectLoadLevel::get_type_string(load_level_));
|
||||
"direct_load_level", storage::ObDirectLoadLevel::get_type_string(load_level_),
|
||||
K_(task_need_sort));
|
||||
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
@ -187,7 +189,7 @@ public:
|
||||
uint64_t max_error_row_count_;
|
||||
uint64_t sql_mode_; // unused
|
||||
int32_t column_count_;
|
||||
bool need_sort_;
|
||||
bool need_sort_; // 表示主表是否要排序
|
||||
bool px_mode_;
|
||||
bool online_opt_stat_gather_;
|
||||
sql::ObLoadDupActionType dup_action_;
|
||||
@ -200,6 +202,7 @@ public:
|
||||
ObCompressorType compressor_type_;
|
||||
double online_sample_percent_;
|
||||
storage::ObDirectLoadLevel::Type load_level_;
|
||||
bool task_need_sort_; // 表示导入任务是否会走到排序流程
|
||||
};
|
||||
|
||||
struct ObTableLoadDDLParam
|
||||
|
@ -24,7 +24,8 @@ OB_SERIALIZE_MEMBER(ObTableLoadConfig,
|
||||
batch_size_,
|
||||
max_error_row_count_,
|
||||
dup_action_,
|
||||
is_need_sort_);
|
||||
is_need_sort_,
|
||||
is_task_need_sort_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObTableLoadSegmentID,
|
||||
id_);
|
||||
|
@ -37,17 +37,19 @@ public:
|
||||
batch_size_(0),
|
||||
max_error_row_count_(0),
|
||||
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
|
||||
is_need_sort_(false)
|
||||
is_need_sort_(false),
|
||||
is_task_need_sort_(false)
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(parallel), K_(batch_size), K_(max_error_row_count), K_(dup_action),
|
||||
K_(is_need_sort));
|
||||
K_(is_need_sort), K_(is_task_need_sort));
|
||||
public:
|
||||
int32_t parallel_;
|
||||
int32_t batch_size_;
|
||||
uint64_t max_error_row_count_;
|
||||
sql::ObLoadDupActionType dup_action_;
|
||||
bool is_need_sort_;
|
||||
bool is_need_sort_; // 表示主表是否要排序
|
||||
bool is_task_need_sort_; // 表示导入任务是否会走到排序流程
|
||||
};
|
||||
|
||||
struct ObTableLoadPartitionId
|
||||
|
Loading…
x
Reference in New Issue
Block a user