[FEAT MERGE] 4.3 optimizer enhancement

Co-authored-by: 2149 <260391947@qq.com>
Co-authored-by: akaError <lzg020616@163.com>
Co-authored-by: jinmaoli <lijinmao.csmaster@gmail.com>
This commit is contained in:
yinyj17
2023-12-16 14:12:43 +00:00
committed by ant-ob-hengtang
parent 438a70b2b8
commit 37fe7ce4eb
50 changed files with 3205 additions and 1059 deletions

View File

@ -575,8 +575,9 @@ ObSortOpImpl::ObSortOpImpl(ObMonitorNode &op_monitor_info)
op_type_(PHY_INVALID), op_id_(UINT64_MAX), exec_ctx_(nullptr), stored_rows_(nullptr),
io_event_observer_(nullptr), buckets_(NULL), max_bucket_cnt_(0), part_hash_nodes_(NULL),
max_node_cnt_(0), part_cnt_(0), topn_cnt_(INT64_MAX), outputted_rows_cnt_(0),
is_fetch_with_ties_(false), topn_heap_(NULL), ties_array_pos_(0), ties_array_(),
last_ties_row_(NULL), rows_(NULL)
is_fetch_with_ties_(false), topn_heap_(), ties_array_pos_(0), last_ties_row_(NULL),
pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0),
rows_(NULL)
{
}
@ -585,6 +586,43 @@ ObSortOpImpl::~ObSortOpImpl()
reset();
}
int ObSortOpImpl::init_topn()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(topn_heap_ = OB_NEWx(TopnHeapNode, (&mem_context_->get_malloc_allocator()),
comp_, &mem_context_->get_malloc_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
}
return ret;
}
int ObSortOpImpl::init_partition_topn()
{
int ret = OB_SUCCESS;
uint64_t bucket_cnt = 16;//next_pow2(std::max(16L, rows.count()));
uint64_t shift_right = __builtin_clzll(bucket_cnt);
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
if (max_bucket_cnt_ < bucket_cnt) {
if (NULL != pt_buckets_) {
alloc.free(pt_buckets_);
pt_buckets_ = NULL;
max_bucket_cnt_ = 0;
}
pt_buckets_ = (PartHeapNode **)alloc.alloc(sizeof(PartHeapNode*) * bucket_cnt);
if (OB_ISNULL(pt_buckets_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret));
} else {
max_bucket_cnt_ = bucket_cnt;
MEMSET(pt_buckets_, 0, sizeof(PartHeapNode*) * bucket_cnt);
}
} else {
MEMSET(pt_buckets_, 0, sizeof(PartHeapNode*) * bucket_cnt);
}
return ret;
}
// Set the note in ObPrefixSortImpl::init(): %sort_columns may be zero, to compatible with
// the wrong generated prefix sort.
int ObSortOpImpl::init(
@ -627,7 +665,8 @@ int ObSortOpImpl::init(
exec_ctx_ = exec_ctx;
part_cnt_ = part_cnt;
topn_cnt_ = topn_cnt;
use_heap_sort_ = is_topn_sort();
use_heap_sort_ = is_topn_sort() && part_cnt_ == 0;
use_partition_topn_sort_ = is_topn_sort() && part_cnt_ > 0;
is_fetch_with_ties_ = is_fetch_with_ties;
int64_t batch_size = eval_ctx_->max_batch_size_;
lib::ContextParam param;
@ -645,11 +684,10 @@ int ObSortOpImpl::init(
0, /* row_extra_size */
default_block_size))) {
LOG_WARN("init row store failed", K(ret));
} else if (is_topn_sort()
&& OB_ISNULL(topn_heap_ = OB_NEWx(TopnHeap, (&mem_context_->get_malloc_allocator()),
comp_, &mem_context_->get_malloc_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (use_heap_sort_ && OB_FAIL(init_topn())) {
LOG_WARN("init topn failed", K(ret));
} else if (use_partition_topn_sort_ && OB_FAIL(init_partition_topn())) {
LOG_WARN("init partition topn failed", K(ret));
} else if (batch_size > 0
&& OB_ISNULL(stored_rows_ = static_cast<ObChunkDatumStore::StoredRow **>(
mem_context_->get_malloc_allocator().alloc(
@ -680,11 +718,14 @@ int ObSortOpImpl::init(
}
if (OB_SUCC(ret)) {
inited_ = true;
if (!is_topn_sort()) {
if (use_partition_topn_sort_) {
//rows_ will be set to heap data of current heap after adding the first row
rows_ = &quick_sort_array_;
} else {
} else if (use_heap_sort_) {
rows_ = &(const_cast<common::ObIArray<ObChunkDatumStore::StoredRow *> &>
(topn_heap_->get_heap_data()));
(topn_heap_->heap_.get_heap_data()));
} else {
rows_ = &quick_sort_array_;
}
}
}
@ -703,14 +744,6 @@ void ObSortOpImpl::reuse()
row_idx_ = 0;
next_stored_row_func_ = &ObSortOpImpl::array_next_stored_row;
ties_array_pos_ = 0;
if (0 != ties_array_.count()) {
for (int64_t i = 0; i < ties_array_.count(); ++i) {
sql_mem_processor_.alloc(-1 * ties_array_[i]->get_max_size());
mem_context_->get_malloc_allocator().free(ties_array_[i]);
ties_array_[i] = NULL;
}
}
ties_array_.reset();
while (!sort_chunks_.is_empty()) {
ObSortOpChunk *chunk = sort_chunks_.remove_first();
chunk->~ObSortOpChunk();
@ -724,15 +757,13 @@ void ObSortOpImpl::reuse()
heap_iter_begin_ = false;
if (NULL != ems_heap_) {
ems_heap_->reset();
}
if (NULL != topn_heap_) {
for (int64_t i = 0; i < topn_heap_->count(); ++i) {
sql_mem_processor_.alloc(-1 *
static_cast<SortStoredRow *>(topn_heap_->at(i))->get_max_size());
mem_context_->get_malloc_allocator().free(static_cast<SortStoredRow *>(topn_heap_->at(i)));
topn_heap_->at(i) = NULL;
}
topn_heap_->reset();
} else if (use_partition_topn_sort_) {
heap_nodes_.reset();
reuse_part_topn_heap();
cur_heap_idx_ = 0;
topn_heap_ = NULL;
} else if (NULL != topn_heap_) {
reuse_topn_heap(topn_heap_);
}
}
@ -767,13 +798,10 @@ void ObSortOpImpl::reset()
is_fetch_with_ties_ = false;
rows_ = NULL;
ties_array_pos_ = 0;
if (0 != ties_array_.count()) {
for (int64_t i = 0; i < ties_array_.count(); ++i) {
mem_context_->get_malloc_allocator().free(ties_array_[i]);
ties_array_[i] = NULL;
}
}
ties_array_.reset();
// for partition topn sort
cur_heap_idx_ = 0;
heap_nodes_.reset();
// for partition topn end
if (NULL != mem_context_) {
if (NULL != imms_heap_) {
imms_heap_->~IMMSHeap();
@ -797,12 +825,12 @@ void ObSortOpImpl::reset()
mem_context_->get_malloc_allocator().free(part_hash_nodes_);
part_hash_nodes_ = NULL;
}
if (NULL != pt_buckets_) {
mem_context_->get_malloc_allocator().free(pt_buckets_);
pt_buckets_ = NULL;
}
if (NULL != topn_heap_) {
for (int64_t i = 0; i < topn_heap_->count(); ++i) {
mem_context_->get_malloc_allocator().free(static_cast<SortStoredRow *>(topn_heap_->at(i)));
topn_heap_->at(i) = NULL;
}
topn_heap_->~TopnHeap();
topn_heap_->~TopnHeapNode();
mem_context_->get_malloc_allocator().free(topn_heap_);
topn_heap_ = NULL;
}
@ -842,8 +870,9 @@ int ObSortOpImpl::build_chunk(const int64_t level, Input &input, int64_t extra_s
chunk->datum_store_.set_allocator(mem_context_->get_malloc_allocator());
chunk->datum_store_.set_callback(&sql_mem_processor_);
chunk->datum_store_.set_io_event_observer(io_event_observer_);
int64_t total_size = 0;
while (OB_SUCC(ret)) {
if (!is_fetch_with_ties_ && stored_row_cnt >= topn_cnt_) {
if (use_heap_sort_ && !is_fetch_with_ties_ && stored_row_cnt >= topn_cnt_) {
break;
} else if (OB_FAIL(input(datum_store, src_store_row))) {
if (OB_ITER_END != ret) {
@ -858,9 +887,9 @@ int ObSortOpImpl::build_chunk(const int64_t level, Input &input, int64_t extra_s
stored_row_cnt++;
op_monitor_info_.otherstat_1_id_ = ObSqlMonitorStatIds::SORT_SORTED_ROW_COUNT;
op_monitor_info_.otherstat_1_value_ += 1;
total_size += src_store_row->row_size_;
}
}
// 必须强制先dump,然后finish dump才有效
if (OB_FAIL(ret)) {
} else if (OB_FAIL(chunk->datum_store_.dump(false, true))) {
@ -1093,7 +1122,7 @@ int ObSortOpImpl::add_row(const common::ObIArray<ObExpr*> &exprs,
const ObChunkDatumStore::StoredRow *&store_row)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(use_heap_sort_ && need_dump())) {
if (OB_UNLIKELY((use_heap_sort_ || use_partition_topn_sort_) && need_dump())) {
bool dumped = false;
if (OB_FAIL(preprocess_dump(dumped))) {
LOG_WARN("failed preprocess dump", K(ret));
@ -1104,12 +1133,33 @@ int ObSortOpImpl::add_row(const common::ObIArray<ObExpr*> &exprs,
if (OB_FAIL(ret)) {
} else if (use_heap_sort_) {
ret = add_heap_sort_row(exprs, store_row);
} else if (use_partition_topn_sort_) {
ret = add_part_heap_sort_row(exprs, store_row);
} else {
ret = add_quick_sort_row(exprs, store_row);
}
return ret;
}
int ObSortOpImpl::add_part_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
const ObChunkDatumStore::StoredRow *&store_row)
{
int ret = OB_SUCCESS;
bool is_cur_block_row = true;
SortStoredRow *new_row = NULL;
if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mem_context is not initialized", K(ret));
} else if (topn_cnt_ <= 0) {
ret = OB_ITER_END;
} else if (OB_FAIL(locate_current_heap(exprs))) {
LOG_WARN("failed to locate heap", K(ret));
} else if (OB_FAIL(add_heap_sort_row(exprs, store_row))) {
LOG_WARN("add heap sort row failed", K(ret));
}
return ret;
}
int ObSortOpImpl::add_quick_sort_batch(const common::ObIArray<ObExpr *> &exprs,
const ObBitVector &skip,
const int64_t batch_size,
@ -1143,7 +1193,7 @@ int ObSortOpImpl::add_batch(const common::ObIArray<ObExpr *> &exprs,
int64_t *append_row_count = nullptr)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(use_heap_sort_ && need_dump())) {
if (OB_UNLIKELY((use_heap_sort_ || use_partition_topn_sort_) && need_dump())) {
bool dumped = false;
if (OB_FAIL(preprocess_dump(dumped))) {
LOG_WARN("failed preprocess dump", K(ret));
@ -1154,6 +1204,8 @@ int ObSortOpImpl::add_batch(const common::ObIArray<ObExpr *> &exprs,
if (OB_FAIL(ret)) {
} else if (use_heap_sort_) {
ret = add_heap_sort_batch(exprs, skip, batch_size, start_pos, append_row_count);
} else if (use_partition_topn_sort_) {
ret = add_part_heap_sort_batch(exprs, skip, batch_size, start_pos, append_row_count);
} else {
ret = add_quick_sort_batch(exprs, skip, batch_size, start_pos, append_row_count);
}
@ -1188,7 +1240,7 @@ int ObSortOpImpl::add_batch(const common::ObIArray<ObExpr *> &exprs,
const uint16_t selector[], const int64_t size)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(use_heap_sort_ && need_dump())) {
if (OB_UNLIKELY((use_heap_sort_ || use_partition_topn_sort_) && need_dump())) {
if (OB_FAIL(do_dump())) {
LOG_WARN("failed to do topn dump", K(ret));
}
@ -1196,6 +1248,8 @@ int ObSortOpImpl::add_batch(const common::ObIArray<ObExpr *> &exprs,
if (OB_FAIL(ret)) {
} else if (use_heap_sort_) {
ret = add_heap_sort_batch(exprs, skip, batch_size, selector, size);
} else if (use_partition_topn_sort_) {
ret = add_part_heap_sort_batch(exprs, skip, batch_size, selector, size);
} else {
ret = add_quick_sort_batch(exprs, skip, batch_size, selector, size);
}
@ -1394,9 +1448,75 @@ int ObSortOpImpl::do_partition_sort(common::ObIArray<ObChunkDatumStore::StoredRo
return ret;
}
int ObSortOpImpl::do_partition_topn_sort() {
int ret = OB_SUCCESS;
heap_nodes_.reuse();
for (int64_t idx = 0; OB_SUCC(ret) && idx < max_bucket_cnt_; ++idx) {
PartHeapNode *hash_node = pt_buckets_[idx];
int64_t cur_bucket_cnt = 0;
int64_t hash_expr_cnt = 1;
cur_bucket_cnt = heap_nodes_.count();
while(hash_node != NULL) {
TopnHeapNode &cur_heap = hash_node->topn_heap_node_;
if (0 == cur_heap.heap_.count()) {
//do nothing
} else if (topn_cnt_ < cur_heap.heap_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("topn is less than array_count", K(ret), K(topn_cnt_), K(cur_heap.heap_.count()));
} else if (OB_FAIL(heap_nodes_.push_back(&cur_heap))) {
LOG_WARN("failed to add into heap nodes", K(ret));
}
hash_node = hash_node->hash_node_next_;
}
if (OB_SUCC(ret)) {
//sort heaps which is in the same bucket
comp_.set_cmp_range(0, part_cnt_ + hash_expr_cnt);
std::sort(heap_nodes_.begin() + cur_bucket_cnt,
heap_nodes_.end(),
TopnHeapNodeComparer(comp_));
//sort rows in heap
if (OB_SUCC(ret) && part_cnt_ + hash_expr_cnt < comp_.get_cnt()) {
comp_.set_cmp_range(part_cnt_ + hash_expr_cnt, comp_.get_cnt());
for (int64_t i = cur_bucket_cnt; OB_SUCC(ret) && i < heap_nodes_.count(); ++i) {
TopnHeapNode *cur_heap = heap_nodes_.at(i);
ObIArray<ObChunkDatumStore::StoredRow *> &heap_rows = (const_cast<common::ObIArray<ObChunkDatumStore::StoredRow *> &>
(cur_heap->heap_.get_heap_data()));
if (enable_encode_sortkey_) {
bool can_encode = true;
ObAdaptiveQS aqs(heap_rows, mem_context_->get_malloc_allocator());
if (OB_FAIL(aqs.init(heap_rows, mem_context_->get_malloc_allocator(), 0, cur_heap->heap_.count(), can_encode))) {
LOG_WARN("failed to init aqs", K(ret));
} else if (can_encode) {
aqs.sort(0, cur_heap->heap_.count());
} else {
enable_encode_sortkey_ = false;
comp_.enable_encode_sortkey_ = false;
std::sort(&heap_rows.at(0), &heap_rows.at(0) + heap_rows.count(), CopyableComparer(comp_));
if (OB_SUCCESS != comp_.ret_) {
ret = comp_.ret_;
LOG_WARN("compare failed", K(ret));
}
}
} else {
std::sort(&heap_rows.at(0), &heap_rows.at(0) + heap_rows.count(), CopyableComparer(comp_));
}
op_monitor_info_.otherstat_1_id_ = ObSqlMonitorStatIds::SORT_SORTED_ROW_COUNT;
op_monitor_info_.otherstat_1_value_ += cur_heap->heap_.count();
}
} else {
//partition limit, do nothing
}
}
}
comp_.set_cmp_range(0, comp_.get_cnt());
return ret;
}
int ObSortOpImpl::do_dump()
{
int ret = OB_SUCCESS;
ObSEArray<std::pair<int64_t, int64_t>, 4> pt_bucket_cnts;
int64_t pt_bucket_dumped = 0;
if (!is_inited()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -1405,22 +1525,40 @@ int ObSortOpImpl::do_dump()
LOG_WARN("invalid argument", K(ret));
} else if (OB_FAIL(sort_inmem_data())) {
LOG_WARN("sort in-memory data failed", K(ret));
} else {
}
if (OB_SUCC(ret)) {
const int64_t level = 0;
if (!need_imms()) {
if (use_partition_topn_sort_) {
int64_t cur_heap_idx = 0;
int64_t row_idx = 0;
const int64_t level = 0;
auto input = [&](ObChunkDatumStore *&rs, const ObChunkDatumStore::StoredRow *&row) {
if (OB_FAIL(part_topn_heap_next(cur_heap_idx, row_idx, row))) {
if (OB_ITER_END != ret) {
LOG_WARN("get row from part topn heap failed", K(ret));
}
} else {
rs = &datum_store_;
}
return ret;
};
if (OB_FAIL(build_chunk(level, input))) {
LOG_WARN("build chunk failed", K(ret));
}
} else if (!need_imms()) {
int64_t row_pos = 0;
int64_t ties_array_pos = 0;
auto input = [&](ObChunkDatumStore *&rs, const ObChunkDatumStore::StoredRow *&row) {
int ret = OB_SUCCESS;
if (row_pos >= rows_->count()
&& ties_array_pos >= ties_array_.count()) {
&& !(NULL != topn_heap_ && ties_array_pos < topn_heap_->ties_array_.count())) {
ret = OB_ITER_END;
} else if (row_pos < rows_->count()) {
row = rows_->at(row_pos);
rs = &datum_store_;
row_pos += 1;
} else {
row = ties_array_.at(ties_array_pos);
} else if (NULL != topn_heap_) {
row = topn_heap_->ties_array_.at(ties_array_pos);
rs = &datum_store_;
ties_array_pos += 1;
}
@ -1445,33 +1583,24 @@ int ObSortOpImpl::do_dump()
LOG_WARN("build chunk failed", K(ret));
}
}
if (OB_SUCC(ret) && use_heap_sort_) {
if (NULL != mem_context_ && NULL != topn_heap_) {
for (int64_t i = 0; i < topn_heap_->count(); ++i) {
sql_mem_processor_.alloc(-1 *
static_cast<SortStoredRow *>(topn_heap_->at(i))->get_max_size());
mem_context_->get_malloc_allocator().free(
static_cast<SortStoredRow *>(topn_heap_->at(i)));
topn_heap_->at(i) = NULL;
}
topn_heap_->~TopnHeap();
mem_context_->get_malloc_allocator().free(topn_heap_);
topn_heap_ = NULL;
}
if (0 != ties_array_.count()) {
for (int64_t i = 0; i < ties_array_.count(); ++i) {
sql_mem_processor_.alloc(-1 * ties_array_[i]->get_max_size());
mem_context_->get_malloc_allocator().free(ties_array_[i]);
ties_array_[i] = NULL;
}
}
ties_array_.reset();
reuse_topn_heap(topn_heap_);
topn_heap_->~TopnHeapNode();
mem_context_->get_malloc_allocator().free(topn_heap_);
topn_heap_ = NULL;
got_first_row_ = false;
use_heap_sort_ = false;
rows_ = &quick_sort_array_;
}
if (OB_SUCC(ret) && use_partition_topn_sort_) {
heap_nodes_.reset();
reuse_part_topn_heap();
topn_heap_ = NULL;
got_first_row_ = false;
rows_ = &quick_sort_array_;
}
if (OB_SUCC(ret)) {
heap_iter_begin_ = false;
row_idx_ = 0;
@ -1486,6 +1615,43 @@ int ObSortOpImpl::do_dump()
return ret;
}
void ObSortOpImpl::reuse_topn_heap(TopnHeapNode *topn_heap) {
if (NULL != topn_heap && NULL != mem_context_) {
for (int64_t i = 0; i < topn_heap->heap_.count(); ++i) {
sql_mem_processor_.alloc(-1 *
static_cast<SortStoredRow *>(topn_heap->heap_.at(i))->get_max_size());
mem_context_->get_malloc_allocator().free(static_cast<SortStoredRow *>(topn_heap->heap_.at(i)));
topn_heap->heap_.at(i) = NULL;
}
topn_heap->heap_.reset();
for (int64_t i = 0; i < topn_heap->ties_array_.count(); ++i) {
sql_mem_processor_.alloc(-1 * topn_heap->ties_array_[i]->get_max_size());
mem_context_->get_malloc_allocator().free(topn_heap->ties_array_[i]);
topn_heap->ties_array_[i] = NULL;
}
topn_heap->ties_array_.reset();
}
}
void ObSortOpImpl::reuse_part_topn_heap() {
for (int64_t i = 0; i < max_bucket_cnt_; ++i) {
if (mem_context_ != NULL) {
PartHeapNode *hash_node = pt_buckets_[i];
while(hash_node != NULL) {
PartHeapNode *cur_node = hash_node;
if (topn_heap_ == &(cur_node->topn_heap_node_)) {
topn_heap_ = NULL;
}
hash_node = cur_node->hash_node_next_;
reuse_topn_heap(&cur_node->topn_heap_node_);
cur_node->~PartHeapNode();
mem_context_->get_malloc_allocator().free(cur_node);
}
pt_buckets_[i] = NULL;
}
}
}
int ObSortOpImpl::build_ems_heap(int64_t &merge_ways)
{
int ret = OB_SUCCESS;
@ -1646,6 +1812,41 @@ int ObSortOpImpl::imms_heap_next(const ObChunkDatumStore::StoredRow *&store_row)
return ret;
}
int ObSortOpImpl::part_topn_heap_next(int64_t &cur_heap_idx,
int64_t &cur_heap_row_idx,
const ObChunkDatumStore::StoredRow *&store_row)
{
int ret = OB_SUCCESS;
TopnHeapNode *cur_heap = NULL;
store_row = NULL;
if (cur_heap_idx < 0) {
ret = OB_ARRAY_OUT_OF_RANGE;
} else {
while (OB_SUCC(ret) && NULL == store_row) {
if (cur_heap_idx >= heap_nodes_.count()) {
ret = OB_ITER_END;
} else {
cur_heap = heap_nodes_.at(cur_heap_idx);
if (OB_ISNULL(cur_heap)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid cur_heap", K(ret));
} else if (cur_heap_row_idx < cur_heap->heap_.count()) {
store_row = cur_heap->heap_.at(cur_heap_row_idx);
cur_heap_row_idx += 1;
} else if (cur_heap_row_idx - cur_heap->heap_.count() < cur_heap->ties_array_.count()) {
store_row = cur_heap->ties_array_.at(cur_heap_row_idx - cur_heap->heap_.count());
cur_heap_row_idx += 1;
} else {
//switch heap
cur_heap_idx += 1;
cur_heap_row_idx = 0;
}
}
}
}
return ret;
}
int ObSortOpImpl::sort_inmem_data()
{
int ret = OB_SUCCESS;
@ -1654,20 +1855,15 @@ int ObSortOpImpl::sort_inmem_data()
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (!rows_->empty()) {
if (!use_heap_sort_ && (local_merge_sort_ || sorted_)) {
if (!use_heap_sort_ && !use_partition_topn_sort_ &&
(local_merge_sort_ || sorted_)) {
// row already in order, do nothing.
} else {
int64_t begin = 0;
if (need_imms()) {
// is increment sort (rows add after sort()), sort the last add rows
for (int64_t i = rows_->count() - 1; i >= 0; i--) {
if (NULL == rows_->at(i)) {
begin = i + 1;
break;
}
}
}
if (part_cnt_ > 0) {
if (use_partition_topn_sort_) {
heap_nodes_.reuse();
do_partition_topn_sort();
} else if (part_cnt_ > 0) {
do_partition_sort(*rows_, begin, rows_->count());
} else if (enable_encode_sortkey_) {
bool can_encode = true;
@ -1752,8 +1948,14 @@ int ObSortOpImpl::sort()
LOG_WARN("init iterator failed", K(ret));
} else {
if (!need_imms()) {
row_idx_ = 0;
next_stored_row_func_ = &ObSortOpImpl::array_next_stored_row;
if (use_partition_topn_sort_) {
cur_heap_idx_ = 0;
row_idx_ = 0;
next_stored_row_func_ = &ObSortOpImpl::part_heap_next_stored_row;
} else {
row_idx_ = 0;
next_stored_row_func_ = &ObSortOpImpl::array_next_stored_row;
}
} else {
next_stored_row_func_ = &ObSortOpImpl::imms_heap_next_stored_row;
}
@ -1811,6 +2013,7 @@ int ObSortOpImpl::sort()
}
if (OB_SUCC(ret)) {
// set iteration age for batch iteration.
set_blk_holder(&blk_holder_);
next_stored_row_func_ = &ObSortOpImpl::ems_heap_next_stored_row;
}
@ -1818,18 +2021,116 @@ int ObSortOpImpl::sort()
return ret;
}
int ObSortOpImpl::locate_current_heap(const common::ObIArray<ObExpr*> &exprs)
{
int ret = OB_SUCCESS;
PartHeapNode *new_heap_node = NULL;
int64_t hash_idx = sort_collations_->at(0).field_idx_;
uint64_t bucket_cnt = max_bucket_cnt_;
uint64_t shift_right = __builtin_clzll(bucket_cnt) + 1;
ObDatum *part_datum = NULL;
uint64_t pos = 0;
if (OB_ISNULL(exprs.at(hash_idx))) {
pos = 0;
} else if (OB_FAIL(exprs.at(hash_idx)->eval(*eval_ctx_, part_datum))) {
LOG_WARN("expression evaluate failed", K(ret));
} else {
pos = part_datum->get_uint64() >> shift_right; // high n bit
}
if (OB_FAIL(ret)) {
} else if (pos > bucket_cnt - 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pos", K(ret));
} else {
PartHeapNode *exist = NULL;
if (OB_FAIL(locate_current_heap_in_bucket(pt_buckets_[pos], exprs, exist))) {
LOG_WARN("locate current heap in bucket failed", K(ret));
} else if (NULL == exist) {
TopnHeap *new_heap = NULL;
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
if (OB_ISNULL(new_heap_node = OB_NEWx(PartHeapNode, &alloc, comp_, &alloc))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
new_heap_node->hash_node_next_ = pt_buckets_[pos];
pt_buckets_[pos] = new_heap_node;
topn_heap_ = &new_heap_node->topn_heap_node_;
}
} else {
topn_heap_ = &(exist->topn_heap_node_);
}
}
if (OB_SUCC(ret)) {
rows_ = &(const_cast<common::ObIArray<ObChunkDatumStore::StoredRow *> &>
(topn_heap_->heap_.get_heap_data()));
}
return ret;
}
int ObSortOpImpl::locate_current_heap_in_bucket(PartHeapNode *first_node,
const common::ObIArray<ObExpr*> &exprs,
PartHeapNode *&exist)
{
int ret = OB_SUCCESS;
ObSEArray<ObDatum *, 4> part_datums;
exist = first_node;
bool find_same_heap = false;
if (NULL != exist) {
for (int64_t i = 0; OB_SUCC(ret) && i <= part_cnt_; ++i) {
int64_t idx = sort_collations_->at(i).field_idx_;
ObDatum *part_datum = NULL;
if (OB_ISNULL(exprs.at(idx))) {
if (OB_FAIL(part_datums.push_back(NULL))) {
LOG_WARN("push back part datums of new row failed", K(ret));
}
} else if (OB_FAIL(exprs.at(idx)->eval(*eval_ctx_, part_datum))) {
LOG_WARN("expression evaluate failed", K(ret));
} else if (OB_FAIL(part_datums.push_back(part_datum))) {
LOG_WARN("push back part datums of new row failed", K(ret));
}
}
}
while (OB_SUCC(ret) && NULL != exist && !find_same_heap) {
if (exist->topn_heap_node_.heap_.count() > 0) {
const ObChunkDatumStore::StoredRow *top_row = exist->topn_heap_node_.heap_.top();
find_same_heap = true;
if (OB_ISNULL(top_row)) {
find_same_heap = false;
}
for (int64_t i = 0; OB_SUCC(ret) && find_same_heap && i <= part_cnt_; ++i) {
uint32_t idx = sort_collations_->at(i).field_idx_;
int cmp_ret = 0;
if (OB_ISNULL(part_datums.at(i))) {
find_same_heap = top_row->cells()[idx].is_null();
} else if (OB_FAIL(sort_cmp_funs_->at(i).cmp_func_(*part_datums.at(i),
top_row->cells()[idx],
cmp_ret))) {
LOG_WARN("failed to compare", K(ret));
} else {
find_same_heap = (0 == cmp_ret);
}
}
}
if (!find_same_heap) {
exist = exist->hash_node_next_;
}
}
return ret;
}
int ObSortOpImpl::array_next_stored_row(
const ObChunkDatumStore::StoredRow *&sr)
{
int ret = OB_SUCCESS;
if (row_idx_ >= rows_->count()
&& ties_array_pos_ >= ties_array_.count()) {
&& (NULL == topn_heap_
|| ties_array_pos_ >= topn_heap_->ties_array_.count())) {
ret = OB_ITER_END;
} else if (row_idx_ < rows_->count()) {
sr = rows_->at(row_idx_);
row_idx_ += 1;
} else {
sr = ties_array_.at(ties_array_pos_);
sr = topn_heap_->ties_array_.at(ties_array_pos_);
ties_array_pos_ += 1;
}
return ret;
@ -1859,6 +2160,23 @@ int ObSortOpImpl::ems_heap_next_stored_row(
return ret;
}
int ObSortOpImpl::part_heap_next_stored_row(const ObChunkDatumStore::StoredRow *&sr)
{
int ret = OB_SUCCESS;
if (OB_FAIL(part_topn_heap_next(cur_heap_idx_, row_idx_, sr))) {
if (OB_ITER_END != ret) {
LOG_WARN("get row from part topn heap failed", K(ret));
} else {
// Reset status when iterating end, because we will add rows and sort again after dumped to disk.
cur_heap_idx_ = 0;
row_idx_ = 0;
heap_nodes_.reset();
sr = nullptr;
}
}
return ret;
}
int ObSortOpImpl::rewind()
{
int ret = OB_SUCCESS;
@ -1920,7 +2238,7 @@ int ObSortOpImpl::get_next_batch(const common::ObIArray<ObExpr*> &exprs,
if (OB_ITER_END != ret) {
LOG_WARN("failed to get next batch stored rows", K(ret));
}
} else if (read_rows > 0 && OB_FAIL(adjust_topn_read_rows(stored_rows_, read_rows))) {
} else if (read_rows > 0 && !use_partition_topn_sort_ && OB_FAIL(adjust_topn_read_rows(stored_rows_, read_rows))) {
LOG_WARN("failed to adjust read rows with ties", K(ret));
} else {
ObChunkDatumStore::Iterator::attach_rows(exprs, *eval_ctx_,
@ -1951,7 +2269,7 @@ int ObSortOpImpl::add_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
bool updated = false;
if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically(
&mem_context_->get_malloc_allocator(),
[&](int64_t cur_cnt){ return topn_heap_->count() > cur_cnt; },
[&](int64_t cur_cnt){ return topn_heap_->heap_.count() > cur_cnt; },
updated))) {
LOG_WARN("failed to get max available memory size", K(ret));
} else if (updated && OB_FAIL(sql_mem_processor_.update_used_mem_size(mem_context_->used()))) {
@ -1960,7 +2278,7 @@ int ObSortOpImpl::add_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
}
if (OB_FAIL(ret)) {
} else if (topn_heap_->count() == topn_cnt_ - outputted_rows_cnt_) {
} else if (topn_heap_->heap_.count() == topn_cnt_ - outputted_rows_cnt_) {
if (is_fetch_with_ties_ && OB_FAIL(adjust_topn_heap_with_ties(exprs, store_row))) {
LOG_WARN("failed to adjust topn heap with ties", K(ret));
} else if (!is_fetch_with_ties_ && OB_FAIL(adjust_topn_heap(exprs, store_row))) {
@ -1969,12 +2287,12 @@ int ObSortOpImpl::add_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
} else { // push back array
SortStoredRow *new_row = NULL;
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
int64_t topn_heap_size = topn_heap_->count();
int64_t topn_heap_size = topn_heap_->heap_.count();
if (OB_FAIL(copy_to_row(exprs, alloc, new_row))) {
LOG_WARN("failed to generate new row", K(ret));
} else if (OB_FAIL(topn_heap_->push(new_row))) {
} else if (OB_FAIL(topn_heap_->heap_.push(new_row))) {
LOG_WARN("failed to push back row", K(ret));
if (topn_heap_->count() == topn_heap_size) {
if (topn_heap_->heap_.count() == topn_heap_size) {
mem_context_->get_malloc_allocator().free(new_row);
new_row = NULL;
}
@ -2034,8 +2352,64 @@ int ObSortOpImpl::add_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
LOG_WARN("check need sort failed", K(ret));
} else if (OB_NOT_NULL(store_row)) {
stored_rows_[i] = const_cast<ObChunkDatumStore::StoredRow *>(store_row);
} else if (OB_NOT_NULL(topn_heap_) && OB_NOT_NULL(topn_heap_->top())) {
stored_rows_[i] = const_cast<ObChunkDatumStore::StoredRow *>(topn_heap_->top());
} else if (OB_NOT_NULL(topn_heap_) && OB_NOT_NULL(topn_heap_->heap_.top())) {
stored_rows_[i] = const_cast<ObChunkDatumStore::StoredRow *>(topn_heap_->heap_.top());
} else {
ret = OB_ERR_UNEXPECTED;
}
}
return ret;
}
int ObSortOpImpl::add_part_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
const ObBitVector &skip,
const int64_t batch_size,
const int64_t start_pos /* 0 */,
int64_t *append_row_count)
{
int ret = OB_SUCCESS;
int64_t row_count = 0;
const ObChunkDatumStore::StoredRow *store_row = NULL;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
batch_info_guard.set_batch_size(batch_size);
for (int64_t i = start_pos; OB_SUCC(ret) && i < batch_size; i++) {
if (skip.at(i)) {
continue;
}
batch_info_guard.set_batch_idx(i);
if (OB_FAIL(add_part_heap_sort_row(exprs, store_row))) {
LOG_WARN("failed to add topn row", K(ret));
}
row_count++;
}
if (OB_NOT_NULL(append_row_count)) {
*append_row_count = row_count;
}
return ret;
}
// if less than heap size or replace heap top, store row is new row
// otherwise, store row will not change as last result obtained.
// here, strored_rows_ only used to fetch prev_row in prefix sort batch.
int ObSortOpImpl::add_part_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
const ObBitVector &skip,
const int64_t batch_size,
const uint16_t selector[],
const int64_t size)
{
int ret = OB_SUCCESS;
const ObChunkDatumStore::StoredRow *store_row = NULL;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
batch_info_guard.set_batch_size(batch_size);
for (int64_t i = 0; i < size && OB_SUCC(ret); i++) {
int64_t idx = selector[i];
batch_info_guard.set_batch_idx(idx);
if (OB_FAIL(add_part_heap_sort_row(exprs, store_row))) {
LOG_WARN("check need sort failed", K(ret));
} else if (OB_NOT_NULL(store_row)) {
stored_rows_[i] = const_cast<ObChunkDatumStore::StoredRow *>(store_row);
} else if (OB_NOT_NULL(topn_heap_) && OB_NOT_NULL(topn_heap_->heap_.top())) {
stored_rows_[i] = const_cast<ObChunkDatumStore::StoredRow *>(topn_heap_->heap_.top());
} else {
ret = OB_ERR_UNEXPECTED;
}
@ -2050,16 +2424,19 @@ int ObSortOpImpl::adjust_topn_heap(const common::ObIArray<ObExpr*> &exprs,
if (OB_ISNULL(mem_context_) || OB_ISNULL(topn_heap_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mem_context or heap is not initialized", K(ret));
} else if (OB_ISNULL(topn_heap_->top())) {
} else if (OB_ISNULL(topn_heap_->heap_.top())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error.top of the heap is NULL", K(ret), K(topn_heap_->count()));
} else if (!topn_heap_->empty()) {
if (comp_(&exprs, topn_heap_->top(), *eval_ctx_)) {
LOG_WARN("unexpected error.top of the heap is NULL", K(ret), K(topn_heap_->heap_.count()));
} else if (!topn_heap_->heap_.empty()) {
if (use_partition_topn_sort_) {
comp_.set_cmp_range(part_cnt_ + 1 /*hash expr cnt*/, comp_.get_cnt());
}
if (comp_(&exprs, topn_heap_->heap_.top(), *eval_ctx_)) {
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
SortStoredRow* new_row = NULL;
if (OB_FAIL(copy_to_topn_row(exprs, alloc, new_row))) {
LOG_WARN("failed to generate new row", K(ret));
} else if (OB_FAIL(topn_heap_->replace_top(new_row))) {
} else if (OB_FAIL(topn_heap_->heap_.replace_top(new_row))) {
LOG_WARN("failed to replace top", K(ret));
} else {
store_row = new_row;
@ -2067,6 +2444,9 @@ int ObSortOpImpl::adjust_topn_heap(const common::ObIArray<ObExpr*> &exprs,
} else {
ret = comp_.ret_;
}
if (OB_SUCC(ret) && use_partition_topn_sort_) {
comp_.set_cmp_range(0, comp_.get_cnt());
}
}
return ret;
}
@ -2084,27 +2464,31 @@ int ObSortOpImpl::adjust_topn_heap_with_ties(const common::ObIArray<ObExpr*> &ex
if (OB_ISNULL(mem_context_) || OB_ISNULL(topn_heap_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mem_context or heap is not initialized", K(ret));
} else if (OB_ISNULL(topn_heap_->top())) {
} else if (OB_ISNULL(topn_heap_->heap_.top())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error.top of the heap is NULL", K(ret), K(topn_heap_->count()));
} else if (!topn_heap_->empty()) {
int cmp = comp_.with_ties_cmp(&exprs, topn_heap_->top(), *eval_ctx_);
LOG_WARN("unexpected error.top of the heap is NULL", K(ret), K(topn_heap_->heap_.count()));
} else if (!topn_heap_->heap_.empty()) {
int cmp = 0;
bool is_alloced = false;
bool add_ties_array = false;
SortStoredRow *new_row = NULL;
SortStoredRow *copy_pre_heap_top_row = NULL;
ObIAllocator &alloc = mem_context_->get_malloc_allocator();
SortStoredRow *pre_heap_top_row = static_cast<SortStoredRow *>(topn_heap_->top());
SortStoredRow *pre_heap_top_row = static_cast<SortStoredRow *>(topn_heap_->heap_.top());
if (use_partition_topn_sort_) {
comp_.set_cmp_range(part_cnt_ + 1 /*hash expr cnt*/, comp_.get_cnt());
}
cmp = comp_.with_ties_cmp(&exprs, topn_heap_->heap_.top(), *eval_ctx_);
if (OB_FAIL(comp_.ret_) || cmp < 0) {
/* do nothing */
} else if (0 == cmp) {
// equal to heap top, add row to ties array
int64_t ties_array_size = ties_array_.count();
int64_t ties_array_size = topn_heap_->ties_array_.count();
if (OB_FAIL(copy_to_row(exprs, alloc, new_row))) {
LOG_WARN("failed to generate new row", K(ret));
} else if (OB_FAIL(ties_array_.push_back(new_row))) {
} else if (OB_FAIL(topn_heap_->ties_array_.push_back(new_row))) {
LOG_WARN("failed to push back ties array", K(ret));
if (ties_array_size == ties_array_.count()) {
if (ties_array_size == topn_heap_->ties_array_.count()) {
mem_context_->get_malloc_allocator().free(new_row);
new_row = NULL;
}
@ -2116,26 +2500,26 @@ int ObSortOpImpl::adjust_topn_heap_with_ties(const common::ObIArray<ObExpr*> &ex
LOG_WARN("failed to generate new row", K(ret));
} else if (OB_FAIL(copy_to_topn_row(exprs, alloc, new_row))) {
LOG_WARN("failed to generate new row", K(ret));
} else if (OB_FAIL(topn_heap_->replace_top(new_row))) {
} else if (OB_FAIL(topn_heap_->heap_.replace_top(new_row))) {
LOG_WARN("failed to replace top", K(ret));
} else if (OB_FALSE_IT(cmp = comp_.with_ties_cmp(copy_pre_heap_top_row, topn_heap_->top()))) {
} else if (OB_FALSE_IT(cmp = comp_.with_ties_cmp(copy_pre_heap_top_row, topn_heap_->heap_.top()))) {
} else if (OB_FAIL(comp_.ret_)) {
/* do nothing */
} else if (0 != cmp) {
// previous heap top not equal to new heap top, clear ties array
LOG_DEBUG("in memory topn sort with ties clear ties array",
KPC(new_row), KPC(copy_pre_heap_top_row));
if (0 != ties_array_.count()) {
for (int64_t i = 0; i < ties_array_.count(); ++i) {
sql_mem_processor_.alloc(-1 * ties_array_[i]->get_max_size());
inmem_row_size_ -= ties_array_[i]->get_max_size();
mem_context_->get_malloc_allocator().free(ties_array_[i]);
ties_array_[i] = NULL;
if (0 != topn_heap_->ties_array_.count()) {
for (int64_t i = 0; i < topn_heap_->ties_array_.count(); ++i) {
sql_mem_processor_.alloc(-1 * topn_heap_->ties_array_[i]->get_max_size());
inmem_row_size_ -= topn_heap_->ties_array_[i]->get_max_size();
mem_context_->get_malloc_allocator().free(topn_heap_->ties_array_[i]);
topn_heap_->ties_array_[i] = NULL;
}
}
ties_array_.reset();
topn_heap_->ties_array_.reset();
store_row = new_row;
} else if (OB_FAIL(ties_array_.push_back(copy_pre_heap_top_row))) {
} else if (OB_FAIL(topn_heap_->ties_array_.push_back(copy_pre_heap_top_row))) {
LOG_WARN("failed to push back ties array", K(ret));
} else {
// previous heap top equal to new heap top, add previous heap top to ties array
@ -2144,6 +2528,9 @@ int ObSortOpImpl::adjust_topn_heap_with_ties(const common::ObIArray<ObExpr*> &ex
LOG_DEBUG("in memory topn sort with ties add ties array",
KPC(new_row), KPC(copy_pre_heap_top_row));
}
if (OB_SUCC(ret) && use_partition_topn_sort_) {
comp_.set_cmp_range(0, comp_.get_cnt());
}
if (!add_ties_array && OB_NOT_NULL(copy_pre_heap_top_row)) {
sql_mem_processor_.alloc(-1 * copy_pre_heap_top_row->get_max_size());
inmem_row_size_ -= copy_pre_heap_top_row->get_max_size();
@ -2160,12 +2547,12 @@ int ObSortOpImpl::copy_to_topn_row(const common::ObIArray<ObExpr*> &exprs,
SortStoredRow *&new_row)
{
int ret = OB_SUCCESS;
SortStoredRow *top_row = static_cast<SortStoredRow *>(topn_heap_->top());
SortStoredRow *top_row = static_cast<SortStoredRow *>(topn_heap_->heap_.top());
if (OB_FAIL(copy_to_row(exprs, alloc, top_row))) {
LOG_WARN("failed to copy to row", K(ret));
} else {
new_row = top_row;
topn_heap_->top() = static_cast<ObChunkDatumStore::StoredRow *>(top_row);
topn_heap_->heap_.top() = static_cast<ObChunkDatumStore::StoredRow *>(top_row);
}
return ret;
}

View File

@ -107,7 +107,7 @@ public:
int ret = OB_SUCCESS;
if (OB_FAIL(add_row(expr, store_row))) {
SQL_ENG_LOG(WARN, "failed to add row", K(ret));
} else if (use_heap_sort_) {
} else if (use_heap_sort_ || use_partition_topn_sort_) {
sort_need_dump = false;
} else {
sort_need_dump = need_dump();
@ -141,7 +141,7 @@ public:
int ret = OB_SUCCESS;
if (OB_FAIL(add_batch(exprs, skip, batch_size, start_pos, append_row_count))) {
SQL_ENG_LOG(WARN, "failed to add batch", K(ret));
} else if (use_heap_sort_) {
} else if (use_heap_sort_ || use_partition_topn_sort_) {
sort_need_dump = false;
} else {
sort_need_dump = need_dump();
@ -179,7 +179,7 @@ public:
if (!is_inited()) {
ret = common::OB_NOT_INIT;
SQL_ENG_LOG(WARN, "not init", K(ret));
} else if (outputted_rows_cnt_ >= topn_cnt_ && !is_fetch_with_ties_) {
} else if (outputted_rows_cnt_ >= topn_cnt_ && !is_fetch_with_ties_ && !use_partition_topn_sort_) {
ret = OB_ITER_END;
} else {
blk_holder_.release();
@ -188,7 +188,7 @@ public:
reuse();
}
}
if (OB_FAIL(ret)) {
if (OB_FAIL(ret) || use_partition_topn_sort_) {
} else if (NULL != last_ties_row_ && 0 != comp_.with_ties_cmp(sr, last_ties_row_)) {
ret = OB_ITER_END;
} else if (OB_UNLIKELY(OB_FAIL(comp_.ret_))) {
@ -436,6 +436,42 @@ protected:
{ get_extra_info().max_size_ = max_size; }
};
typedef common::ObBinaryHeap<ObChunkDatumStore::StoredRow *, Compare> TopnHeap;
struct TopnHeapNode
{
TopnHeapNode(Compare &cmp, common::ObIAllocator *allocator = NULL): heap_(cmp, allocator), ties_array_() {}
~TopnHeapNode() {
heap_.reset();
ties_array_.reset();
}
TO_STRING_EMPTY();
TopnHeap heap_;
common::ObArray<SortStoredRow *> ties_array_;
};
struct PartHeapNode
{
PartHeapNode(Compare &cmp, common::ObIAllocator *allocator = NULL):
hash_node_next_(NULL),
topn_heap_node_(cmp, allocator) {}
~PartHeapNode() {hash_node_next_ = NULL; topn_heap_node_.~TopnHeapNode(); }
PartHeapNode *hash_node_next_;
TopnHeapNode topn_heap_node_;
TO_STRING_EMPTY();
};
class TopnHeapNodeComparer
{
public:
TopnHeapNodeComparer(Compare &compare) : compare_(compare) {}
bool operator()(const TopnHeapNode *l, const TopnHeapNode *r)
{
return compare_(l->heap_.top(),
r->heap_.top());
}
Compare &compare_;
};
int get_next_row(const common::ObIArray<ObExpr*> &exprs, const ObChunkDatumStore::StoredRow *&sr)
{
int ret = common::OB_SUCCESS;
@ -459,10 +495,12 @@ protected:
bool need_imms() const
{
return !use_heap_sort_ && rows_->count() > datum_store_.get_row_cnt();
return !use_heap_sort_ && !use_partition_topn_sort_
&& rows_->count() > datum_store_.get_row_cnt();
}
int sort_inmem_data();
int do_dump();
int inner_dump_part_topn_heap();
template <typename Input>
int build_chunk(const int64_t level, Input &input, int64_t extra_size = 0);
@ -472,6 +510,9 @@ protected:
int heap_next(Heap &heap, const NextFunc &func, Item &item);
int ems_heap_next(ObSortOpChunk *&chunk);
int imms_heap_next(const ObChunkDatumStore::StoredRow *&store_row);
int imms_partition_topn_next(const ObChunkDatumStore::StoredRow *&store_row);
int part_topn_heap_next(int64_t &cur_heap_array_idx, int64_t &cur_heap_idx,
const ObChunkDatumStore::StoredRow *&store_row);
int array_next_stored_row(
const ObChunkDatumStore::StoredRow *&sr);
@ -479,6 +520,8 @@ protected:
const ObChunkDatumStore::StoredRow *&sr);
int ems_heap_next_stored_row(
const ObChunkDatumStore::StoredRow *&sr);
int part_heap_next_stored_row(
const ObChunkDatumStore::StoredRow *&sr);
// 这里need dump外加两个条件: 1) data_size > expect_size 2) mem_used > global_bound
// 为什么如此,原因在于expect size可能是one pass size,所以数据大于expect size,
@ -510,8 +553,13 @@ protected:
int is_equal_part(const ObChunkDatumStore::StoredRow *l, const ObChunkDatumStore::StoredRow *r, bool &is_equal);
int do_partition_sort(common::ObIArray<ObChunkDatumStore::StoredRow *> &rows,
const int64_t rows_begin, const int64_t rows_end);
int do_partition_topn_sort();
void set_blk_holder(ObChunkDatumStore::IteratedBlockHolder *blk_holder);
bool is_in_same_heap(const SortStoredRow *l,
const SortStoredRow*r);
// for topn sort
int init_topn();
void reuse_topn_heap(TopnHeapNode *topn_heap);
int add_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
const ObChunkDatumStore::StoredRow *&store_row);
int add_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
@ -543,13 +591,32 @@ protected:
SortStoredRow *&new_row);
int generate_last_ties_row(const ObChunkDatumStore::StoredRow *orign_row);
int adjust_topn_read_rows(ObChunkDatumStore::StoredRow **stored_rows, int64_t &read_cnt);
// for partition topn
int init_partition_topn();
void reuse_part_topn_heap();
int locate_current_heap(const common::ObIArray<ObExpr*> &exprs);
int locate_current_heap_in_bucket(PartHeapNode *first_node,
const common::ObIArray<ObExpr*> &exprs,
PartHeapNode *&exist);
void clean_up_topn_heap(TopnHeap *&topn_heap);
int add_part_heap_sort_row(const common::ObIArray<ObExpr*> &exprs,
const ObChunkDatumStore::StoredRow *&store_row);
int add_part_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
const ObBitVector &skip,
const int64_t batch_size,
const int64_t start_pos /* 0 */,
int64_t *append_row_count = nullptr);
int add_part_heap_sort_batch(const common::ObIArray<ObExpr *> &exprs,
const ObBitVector &skip,
const int64_t batch_size,
const uint16_t selector[],
const int64_t size);
DISALLOW_COPY_AND_ASSIGN(ObSortOpImpl);
protected:
typedef common::ObBinaryHeap<ObChunkDatumStore::StoredRow **, Compare, 16> IMMSHeap;
typedef common::ObBinaryHeap<ObSortOpChunk *, Compare, MAX_MERGE_WAYS> EMSHeap;
typedef common::ObBinaryHeap<ObChunkDatumStore::StoredRow *, Compare> TopnHeap;
//typedef common::ObBinaryHeap<ObChunkDatumStore::StoredRow *, Compare> TopnHeap;
static const int64_t MAX_ROW_CNT = 268435456; // (2G / 8)
static const int64_t STORE_ROW_HEADER_SIZE = sizeof(SortStoredRow);
static const int64_t STORE_ROW_EXTRA_SIZE = sizeof(uint64_t);
@ -601,10 +668,14 @@ protected:
// for topn sort
bool use_heap_sort_;
bool is_fetch_with_ties_;
TopnHeap *topn_heap_;
TopnHeapNode *topn_heap_;
int64_t ties_array_pos_;
common::ObArray<SortStoredRow *> ties_array_;
ObChunkDatumStore::StoredRow *last_ties_row_;
// for window function partition topn sort
PartHeapNode **pt_buckets_;
bool use_partition_topn_sort_;
ObSEArray<TopnHeapNode*, 16> heap_nodes_;
int64_t cur_heap_idx_;
common::ObIArray<ObChunkDatumStore::StoredRow *> *rows_;
ObChunkDatumStore::IteratedBlockHolder blk_holder_;
};