fix partition topn sort use fixed hash bucket size

This commit is contained in:
obdev 2024-04-08 05:55:08 +00:00 committed by ob-robot
parent f296a63615
commit a1d20eb244
3 changed files with 79 additions and 9 deletions

View File

@ -433,10 +433,15 @@ int ObSortOp::init_sort(int64_t tenant_id,
int64_t topn_cnt)
{
int ret = OB_SUCCESS;
int64_t est_rows = MY_SPEC.rows_;
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
LOG_WARN("failed to get px size", K(ret));
}
OZ(sort_impl_.init(tenant_id, &MY_SPEC.sort_collations_, &MY_SPEC.sort_cmp_funs_,
&eval_ctx_, &ctx_, MY_SPEC.enable_encode_sortkey_opt_, MY_SPEC.is_local_merge_sort_,
false /* need_rewind */, MY_SPEC.part_cnt_, topn_cnt, MY_SPEC.is_fetch_with_ties_,
ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_));
ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_, est_rows));
if (is_batch) {
read_batch_func_ = &ObSortOp::sort_impl_next_batch;
} else {

View File

@ -576,7 +576,7 @@ ObSortOpImpl::ObSortOpImpl(ObMonitorNode &op_monitor_info)
io_event_observer_(nullptr), buckets_(NULL), max_bucket_cnt_(0), part_hash_nodes_(NULL),
max_node_cnt_(0), part_cnt_(0), topn_cnt_(INT64_MAX), outputted_rows_cnt_(0),
is_fetch_with_ties_(false), topn_heap_(NULL), ties_array_pos_(0),
last_ties_row_(NULL), pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0),
last_ties_row_(NULL), pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0), part_group_cnt_(0),
rows_(NULL), sort_exprs_(nullptr),
compress_type_(NONE_COMPRESSOR)
{
@ -598,11 +598,11 @@ int ObSortOpImpl::init_topn()
return ret;
}
int ObSortOpImpl::init_partition_topn()
int ObSortOpImpl::init_partition_topn(const int64_t est_rows)
{
int ret = OB_SUCCESS;
uint64_t bucket_cnt = 16;//next_pow2(std::max(16L, rows.count()));
uint64_t shift_right = __builtin_clzll(bucket_cnt);
uint64_t bucket_cnt = next_pow2((est_rows < MIN_BUCKET_COUNT) ? MIN_BUCKET_COUNT :
(est_rows > MAX_BUCKET_COUNT) ? MAX_BUCKET_COUNT : est_rows);
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
if (max_bucket_cnt_ < bucket_cnt) {
if (NULL != pt_buckets_) {
@ -624,6 +624,58 @@ int ObSortOpImpl::init_partition_topn()
return ret;
}
int ObSortOpImpl::enlarge_partition_topn_buckets()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 == max_bucket_cnt_) || OB_ISNULL(pt_buckets_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected bucket", K(max_bucket_cnt_), K(pt_buckets_));
} else {
int64_t new_bucket_cnt = max_bucket_cnt_ * 2;
uint64_t shift_right = __builtin_clzll(new_bucket_cnt) + 1;
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
PartHeapNode **new_pt_buckets = (PartHeapNode **)alloc.alloc(sizeof(PartHeapNode*) * new_bucket_cnt);
int64_t hash_idx = sort_collations_->at(0).field_idx_;
if (OB_ISNULL(new_pt_buckets)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret));
} else {
MEMSET(new_pt_buckets, 0, sizeof(PartHeapNode*) * new_bucket_cnt);
ObChunkDatumStore::StoredRow *top_row = NULL;
for (int64_t idx = 0; OB_SUCC(ret) && idx < max_bucket_cnt_; ++idx) {
PartHeapNode *hash_node = pt_buckets_[idx];
PartHeapNode *next_hash_node = NULL;
while(hash_node != NULL) {
next_hash_node = hash_node->hash_node_next_;
TopnHeapNode &cur_heap = hash_node->topn_heap_node_;
if (OB_UNLIKELY(0 == cur_heap.heap_.count()) ||
OB_ISNULL(top_row = cur_heap.heap_.top())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected heap", K(cur_heap.heap_), K(top_row));
} else if (OB_UNLIKELY(top_row->cnt_ <= hash_idx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected hash idx", K(hash_idx), K(top_row->cnt_));
} else {
int64_t new_pos = top_row->cells()[hash_idx].get_uint64() >> shift_right;
hash_node->hash_node_next_ = new_pt_buckets[new_pos];
new_pt_buckets[new_pos] = hash_node;
}
hash_node = next_hash_node;
}
}
if (OB_SUCC(ret)) {
alloc.free(pt_buckets_);
pt_buckets_ = new_pt_buckets;
max_bucket_cnt_ = new_bucket_cnt;
} else {
alloc.free(new_pt_buckets);
new_pt_buckets = NULL;
}
}
}
return ret;
}
// Set the note in ObPrefixSortImpl::init(): %sort_columns may be zero, to compatible with
// the wrong generated prefix sort.
int ObSortOpImpl::init(
@ -640,7 +692,8 @@ int ObSortOpImpl::init(
const bool is_fetch_with_ties /* = false */,
const int64_t default_block_size /* = 64KB */,
const ObCompressorType compress_type /* = NONE_COMPRESS */,
const ExprFixedArray *exprs /* =nullptr */)
const ExprFixedArray *exprs /* =nullptr */,
const int64_t est_rows /* = 0 */)
{
int ret = OB_SUCCESS;
if (is_inited()) {
@ -691,7 +744,7 @@ int ObSortOpImpl::init(
LOG_WARN("init row store failed", K(ret));
} else if (use_heap_sort_ && OB_FAIL(init_topn())) {
LOG_WARN("init topn failed", K(ret));
} else if (use_partition_topn_sort_ && OB_FAIL(init_partition_topn())) {
} else if (use_partition_topn_sort_ && OB_FAIL(init_partition_topn(est_rows))) {
LOG_WARN("init partition topn failed", K(ret));
} else if (batch_size > 0
&& OB_ISNULL(stored_rows_ = static_cast<ObChunkDatumStore::StoredRow **>(
@ -768,6 +821,7 @@ void ObSortOpImpl::reuse()
heap_nodes_.reset();
reuse_part_topn_heap();
cur_heap_idx_ = 0;
part_group_cnt_ = 0;
topn_heap_ = NULL;
} else if (NULL != topn_heap_) {
reuse_topn_heap(topn_heap_);
@ -809,6 +863,7 @@ void ObSortOpImpl::reset()
sort_exprs_ = nullptr;
// for partition topn sort
cur_heap_idx_ = 0;
part_group_cnt_ = 0;
heap_nodes_.reset();
// for partition topn end
if (NULL != mem_context_) {
@ -1165,6 +1220,9 @@ int ObSortOpImpl::add_part_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
LOG_WARN("failed to locate heap", K(ret));
} else if (OB_FAIL(add_heap_sort_row(exprs, store_row))) {
LOG_WARN("add heap sort row failed", K(ret));
} else if (OB_UNLIKELY(part_group_cnt_ > max_bucket_cnt_) &&
OB_FAIL(enlarge_partition_topn_buckets())) {
LOG_WARN("failed to enlarge partition topn buckets");
}
return ret;
}
@ -1599,6 +1657,7 @@ int ObSortOpImpl::do_dump()
if (OB_SUCC(ret) && use_partition_topn_sort_) {
heap_nodes_.reset();
reuse_part_topn_heap();
part_group_cnt_ = 0;
topn_heap_ = NULL;
got_first_row_ = false;
rows_ = &quick_sort_array_;
@ -2062,6 +2121,7 @@ int ObSortOpImpl::locate_current_heap(const common::ObIArray<ObExpr*> &exprs)
new_heap_node->hash_node_next_ = pt_buckets_[pos];
pt_buckets_[pos] = new_heap_node;
topn_heap_ = &new_heap_node->topn_heap_node_;
++part_group_cnt_;
}
} else {
topn_heap_ = &(exist->topn_heap_node_);

View File

@ -262,7 +262,8 @@ public:
const bool is_fetch_with_ties = false,
const int64_t default_block_size = ObChunkDatumStore::BLOCK_SIZE,
const common::ObCompressorType compressor_type = common::NONE_COMPRESSOR,
const ExprFixedArray *exprs = nullptr);
const ExprFixedArray *exprs = nullptr,
const int64_t est_rows = 0);
virtual int64_t get_prefix_pos() const { return 0; }
// keep initialized, can sort same rows (same cell type, cell count, projector) after reuse.
@ -771,7 +772,8 @@ protected:
int generate_last_ties_row(const ObChunkDatumStore::StoredRow *orign_row);
int adjust_topn_read_rows(ObChunkDatumStore::StoredRow **stored_rows, int64_t &read_cnt);
// for partition topn
int init_partition_topn();
int init_partition_topn(const int64_t est_rows);
int enlarge_partition_topn_buckets();
void reuse_part_topn_heap();
int locate_current_heap(const common::ObIArray<ObExpr*> &exprs);
int locate_current_heap_in_bucket(PartHeapNode *first_node,
@ -800,6 +802,8 @@ protected:
static const int64_t MAX_ROW_CNT = 268435456; // (2G / 8)
static const int64_t STORE_ROW_HEADER_SIZE = sizeof(SortStoredRow);
static const int64_t STORE_ROW_EXTRA_SIZE = sizeof(uint64_t);
static const int64_t MIN_BUCKET_COUNT = 1L << 14; //16384;
static const int64_t MAX_BUCKET_COUNT = 1L << 19; //524288;
bool inited_;
bool local_merge_sort_;
bool need_rewind_;
@ -856,6 +860,7 @@ protected:
bool use_partition_topn_sort_;
ObSEArray<TopnHeapNode*, 16> heap_nodes_;
int64_t cur_heap_idx_;
int64_t part_group_cnt_;
common::ObIArray<ObChunkDatumStore::StoredRow *> *rows_;
ObTempBlockStore::BlockHolder compact_blk_holder_;
ObChunkDatumStore::IteratedBlockHolder default_blk_holder_;