diff --git a/src/sql/engine/sort/ob_sort_op.cpp b/src/sql/engine/sort/ob_sort_op.cpp index e335638a0..ed1912063 100644 --- a/src/sql/engine/sort/ob_sort_op.cpp +++ b/src/sql/engine/sort/ob_sort_op.cpp @@ -433,10 +433,15 @@ int ObSortOp::init_sort(int64_t tenant_id, int64_t topn_cnt) { int ret = OB_SUCCESS; + int64_t est_rows = MY_SPEC.rows_; + if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size( + &ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) { + LOG_WARN("failed to get px size", K(ret)); + } OZ(sort_impl_.init(tenant_id, &MY_SPEC.sort_collations_, &MY_SPEC.sort_cmp_funs_, &eval_ctx_, &ctx_, MY_SPEC.enable_encode_sortkey_opt_, MY_SPEC.is_local_merge_sort_, false /* need_rewind */, MY_SPEC.part_cnt_, topn_cnt, MY_SPEC.is_fetch_with_ties_, - ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_)); + ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_, est_rows)); if (is_batch) { read_batch_func_ = &ObSortOp::sort_impl_next_batch; } else { diff --git a/src/sql/engine/sort/ob_sort_op_impl.cpp b/src/sql/engine/sort/ob_sort_op_impl.cpp index dad9508db..d7b527e79 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.cpp +++ b/src/sql/engine/sort/ob_sort_op_impl.cpp @@ -576,7 +576,7 @@ ObSortOpImpl::ObSortOpImpl(ObMonitorNode &op_monitor_info) io_event_observer_(nullptr), buckets_(NULL), max_bucket_cnt_(0), part_hash_nodes_(NULL), max_node_cnt_(0), part_cnt_(0), topn_cnt_(INT64_MAX), outputted_rows_cnt_(0), is_fetch_with_ties_(false), topn_heap_(NULL), ties_array_pos_(0), - last_ties_row_(NULL), pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0), + last_ties_row_(NULL), pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0), part_group_cnt_(0), rows_(NULL), sort_exprs_(nullptr), compress_type_(NONE_COMPRESSOR) { @@ -598,11 +598,11 @@ int ObSortOpImpl::init_topn() return ret; } -int ObSortOpImpl::init_partition_topn() +int ObSortOpImpl::init_partition_topn(const int64_t est_rows) { int ret = OB_SUCCESS; - uint64_t bucket_cnt = 16;//next_pow2(std::max(16L, rows.count())); - uint64_t shift_right = __builtin_clzll(bucket_cnt); + uint64_t bucket_cnt = next_pow2((est_rows < MIN_BUCKET_COUNT) ? MIN_BUCKET_COUNT : + (est_rows > MAX_BUCKET_COUNT) ? MAX_BUCKET_COUNT : est_rows); ObIAllocator &alloc = mem_context_->get_malloc_allocator(); if (max_bucket_cnt_ < bucket_cnt) { if (NULL != pt_buckets_) { @@ -624,6 +624,58 @@ int ObSortOpImpl::init_partition_topn() return ret; } +int ObSortOpImpl::enlarge_partition_topn_buckets() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(0 == max_bucket_cnt_) || OB_ISNULL(pt_buckets_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected bucket", K(max_bucket_cnt_), K(pt_buckets_)); + } else { + int64_t new_bucket_cnt = max_bucket_cnt_ * 2; + uint64_t shift_right = __builtin_clzll(new_bucket_cnt) + 1; + ObIAllocator &alloc = mem_context_->get_malloc_allocator(); + PartHeapNode **new_pt_buckets = (PartHeapNode **)alloc.alloc(sizeof(PartHeapNode*) * new_bucket_cnt); + int64_t hash_idx = sort_collations_->at(0).field_idx_; + if (OB_ISNULL(new_pt_buckets)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc memory", K(ret)); + } else { + MEMSET(new_pt_buckets, 0, sizeof(PartHeapNode*) * new_bucket_cnt); + ObChunkDatumStore::StoredRow *top_row = NULL; + for (int64_t idx = 0; OB_SUCC(ret) && idx < max_bucket_cnt_; ++idx) { + PartHeapNode *hash_node = pt_buckets_[idx]; + PartHeapNode *next_hash_node = NULL; + while(hash_node != NULL) { + next_hash_node = hash_node->hash_node_next_; + TopnHeapNode &cur_heap = hash_node->topn_heap_node_; + if (OB_UNLIKELY(0 == cur_heap.heap_.count()) || + OB_ISNULL(top_row = cur_heap.heap_.top())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected heap", K(cur_heap.heap_), K(top_row)); + } else if (OB_UNLIKELY(top_row->cnt_ <= hash_idx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected hash idx", K(hash_idx), K(top_row->cnt_)); + } else { + int64_t new_pos = top_row->cells()[hash_idx].get_uint64() >> shift_right; + hash_node->hash_node_next_ = new_pt_buckets[new_pos]; + new_pt_buckets[new_pos] = hash_node; + } + hash_node = next_hash_node; + } + } + if (OB_SUCC(ret)) { + alloc.free(pt_buckets_); + pt_buckets_ = new_pt_buckets; + max_bucket_cnt_ = new_bucket_cnt; + } else { + alloc.free(new_pt_buckets); + new_pt_buckets = NULL; + } + } + } + return ret; +} + // Set the note in ObPrefixSortImpl::init(): %sort_columns may be zero, to compatible with // the wrong generated prefix sort. int ObSortOpImpl::init( @@ -640,7 +692,8 @@ int ObSortOpImpl::init( const bool is_fetch_with_ties /* = false */, const int64_t default_block_size /* = 64KB */, const ObCompressorType compress_type /* = NONE_COMPRESS */, - const ExprFixedArray *exprs /* =nullptr */) + const ExprFixedArray *exprs /* =nullptr */, + const int64_t est_rows /* = 0 */) { int ret = OB_SUCCESS; if (is_inited()) { @@ -691,7 +744,7 @@ int ObSortOpImpl::init( LOG_WARN("init row store failed", K(ret)); } else if (use_heap_sort_ && OB_FAIL(init_topn())) { LOG_WARN("init topn failed", K(ret)); - } else if (use_partition_topn_sort_ && OB_FAIL(init_partition_topn())) { + } else if (use_partition_topn_sort_ && OB_FAIL(init_partition_topn(est_rows))) { LOG_WARN("init partition topn failed", K(ret)); } else if (batch_size > 0 && OB_ISNULL(stored_rows_ = static_cast( @@ -768,6 +821,7 @@ void ObSortOpImpl::reuse() heap_nodes_.reset(); reuse_part_topn_heap(); cur_heap_idx_ = 0; + part_group_cnt_ = 0; topn_heap_ = NULL; } else if (NULL != topn_heap_) { reuse_topn_heap(topn_heap_); @@ -809,6 +863,7 @@ void ObSortOpImpl::reset() sort_exprs_ = nullptr; // for partition topn sort cur_heap_idx_ = 0; + part_group_cnt_ = 0; heap_nodes_.reset(); // for partition topn end if (NULL != mem_context_) { @@ -1165,6 +1220,9 @@ int ObSortOpImpl::add_part_heap_sort_row(const common::ObIArray &exprs, LOG_WARN("failed to locate heap", K(ret)); } else if (OB_FAIL(add_heap_sort_row(exprs, store_row))) { LOG_WARN("add heap sort row failed", K(ret)); + } else if (OB_UNLIKELY(part_group_cnt_ > max_bucket_cnt_) && + OB_FAIL(enlarge_partition_topn_buckets())) { + LOG_WARN("failed to enlarge partition topn buckets"); } return ret; } @@ -1599,6 +1657,7 @@ int ObSortOpImpl::do_dump() if (OB_SUCC(ret) && use_partition_topn_sort_) { heap_nodes_.reset(); reuse_part_topn_heap(); + part_group_cnt_ = 0; topn_heap_ = NULL; got_first_row_ = false; rows_ = &quick_sort_array_; @@ -2062,6 +2121,7 @@ int ObSortOpImpl::locate_current_heap(const common::ObIArray &exprs) new_heap_node->hash_node_next_ = pt_buckets_[pos]; pt_buckets_[pos] = new_heap_node; topn_heap_ = &new_heap_node->topn_heap_node_; + ++part_group_cnt_; } } else { topn_heap_ = &(exist->topn_heap_node_); diff --git a/src/sql/engine/sort/ob_sort_op_impl.h b/src/sql/engine/sort/ob_sort_op_impl.h index 2080f8143..f64379265 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.h +++ b/src/sql/engine/sort/ob_sort_op_impl.h @@ -262,7 +262,8 @@ public: const bool is_fetch_with_ties = false, const int64_t default_block_size = ObChunkDatumStore::BLOCK_SIZE, const common::ObCompressorType compressor_type = common::NONE_COMPRESSOR, - const ExprFixedArray *exprs = nullptr); + const ExprFixedArray *exprs = nullptr, + const int64_t est_rows = 0); virtual int64_t get_prefix_pos() const { return 0; } // keep initialized, can sort same rows (same cell type, cell count, projector) after reuse. @@ -771,7 +772,8 @@ protected: int generate_last_ties_row(const ObChunkDatumStore::StoredRow *orign_row); int adjust_topn_read_rows(ObChunkDatumStore::StoredRow **stored_rows, int64_t &read_cnt); // for partition topn - int init_partition_topn(); + int init_partition_topn(const int64_t est_rows); + int enlarge_partition_topn_buckets(); void reuse_part_topn_heap(); int locate_current_heap(const common::ObIArray &exprs); int locate_current_heap_in_bucket(PartHeapNode *first_node, @@ -800,6 +802,8 @@ protected: static const int64_t MAX_ROW_CNT = 268435456; // (2G / 8) static const int64_t STORE_ROW_HEADER_SIZE = sizeof(SortStoredRow); static const int64_t STORE_ROW_EXTRA_SIZE = sizeof(uint64_t); + static const int64_t MIN_BUCKET_COUNT = 1L << 14; //16384; + static const int64_t MAX_BUCKET_COUNT = 1L << 19; //524288; bool inited_; bool local_merge_sort_; bool need_rewind_; @@ -856,6 +860,7 @@ protected: bool use_partition_topn_sort_; ObSEArray heap_nodes_; int64_t cur_heap_idx_; + int64_t part_group_cnt_; common::ObIArray *rows_; ObTempBlockStore::BlockHolder compact_blk_holder_; ObChunkDatumStore::IteratedBlockHolder default_blk_holder_;