fix bug:dynamically adjust the bucket_cnt value to prevent reporting error-code 4013
This commit is contained in:
@ -90,8 +90,6 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
is_multiple_mode_ = (!ctx_->schema_.is_heap_table_ && ctx_->param_.need_sort_) ||
|
||||
ls_partition_ids_.count() > ObDirectLoadTableStore::MAX_BUCKET_CNT;
|
||||
table_data_desc_.rowkey_column_num_ =
|
||||
(!ctx_->schema_.is_heap_table_ ? ctx_->schema_.rowkey_column_count_ : 0);
|
||||
table_data_desc_.column_count_ = ctx_->param_.column_count_;
|
||||
@ -125,6 +123,22 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
|
||||
table_data_desc_.heap_table_mem_chunk_size_ = wa_mem_limit / ctx_->param_.session_count_;
|
||||
LOG_INFO("table_data_desc init end", K(table_data_desc_));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (table_data_desc_.is_heap_table_) {
|
||||
int64_t bucket_cnt = wa_mem_limit / (ctx_->param_.session_count_ * MACRO_BLOCK_WRITER_MEM_SIZE);
|
||||
if (ls_partition_ids_.count() <= bucket_cnt) {
|
||||
is_fast_heap_table_ = true;
|
||||
} else {
|
||||
is_multiple_mode_ = true;
|
||||
}
|
||||
} else {
|
||||
int64_t bucket_cnt = wa_mem_limit / (ctx_->param_.session_count_ *
|
||||
(table_data_desc_.sstable_index_block_size_ +
|
||||
table_data_desc_.sstable_data_block_size_));
|
||||
is_multiple_mode_ = ctx_->param_.need_sort_ || ls_partition_ids_.count() > bucket_cnt;
|
||||
}
|
||||
LOG_INFO("multiple_mode is inited", K(is_multiple_mode_), K(is_fast_heap_table_));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(insert_table_param.ls_partition_ids_.assign(target_ls_partition_ids_))) {
|
||||
@ -198,19 +212,15 @@ int ObTableLoadStoreCtx::init(int64_t ddl_task_id,
|
||||
else if (ctx_->schema_.has_identity_column_ && OB_FAIL(init_sequence())) {
|
||||
LOG_WARN("fail to init sequence", KR(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!is_multiple_mode_ && ctx_->schema_.is_heap_table_) {
|
||||
is_fast_heap_table_ = true;
|
||||
if (OB_ISNULL(fast_heap_table_ctx_ =
|
||||
OB_NEWx(ObDirectLoadFastHeapTableContext, (&allocator_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObDirectLoadFastHeapTableContext", KR(ret));
|
||||
} else if (OB_FAIL(fast_heap_table_ctx_->init(ctx_->param_.tenant_id_,
|
||||
ls_partition_ids_,
|
||||
target_ls_partition_ids_,
|
||||
ctx_->param_.session_count_))) {
|
||||
LOG_WARN("fail to init fast heap table ctx", KR(ret));
|
||||
}
|
||||
if (OB_SUCC(ret) && is_fast_heap_table_) {
|
||||
if (OB_ISNULL(fast_heap_table_ctx_ =
|
||||
OB_NEWx(ObDirectLoadFastHeapTableContext, (&allocator_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObDirectLoadFastHeapTableContext", KR(ret));
|
||||
} else if (OB_FAIL(fast_heap_table_ctx_->init(ctx_->param_.tenant_id_, ls_partition_ids_,
|
||||
target_ls_partition_ids_,
|
||||
ctx_->param_.session_count_))) {
|
||||
LOG_WARN("fail to init fast heap table ctx", KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
||||
@ -36,6 +36,7 @@ class ObTableLoadErrorRowHandler;
|
||||
|
||||
class ObTableLoadStoreCtx
|
||||
{
|
||||
static const int64_t MACRO_BLOCK_WRITER_MEM_SIZE = 10 * 1024LL * 1024LL;
|
||||
public:
|
||||
ObTableLoadStoreCtx(ObTableLoadTableCtx *ctx);
|
||||
~ObTableLoadStoreCtx();
|
||||
|
||||
Reference in New Issue
Block a user