fix some fatal error for merge groupby/ hash distinct/ px_coord_merge_sort

This commit is contained in:
ls0 2021-09-08 14:50:42 +08:00 committed by wangzelin.wzl
parent ea087416e4
commit 79f1a13d71
3 changed files with 35 additions and 19 deletions

View File

@ -494,7 +494,9 @@ int ObAggregateProcessor::clone_cell(ObDatum& target_cell, const ObDatum& src_ce
int64_t curr_size = 0;
// length + magic num + data
int64_t need_size = sizeof(int64_t) * 2 + (is_number ? ObNumber::MAX_BYTE_LEN : src_cell.len_);
if (target_cell.len_ > 0) {
// we can't decide reuse memory on target_cell.len_, because for null datum we
// also have reserved buffer
if (OB_NOT_NULL(target_cell.ptr_)) {
void* data_ptr = const_cast<char*>(target_cell.ptr_);
const int64_t data_length = target_cell.len_;
if (OB_ISNULL((char*)data_ptr - sizeof(int64_t)) ||
@ -548,10 +550,11 @@ int ObAggregateProcessor::clone_cell(ObDatum& target_cell, const ObDatum& src_ce
}
if (OB_SUCC(ret)) {
int64_t pos = 0;
if (OB_FAIL(target_cell.deep_copy(src_cell, buf, need_size, pos))) {
LOG_WARN("fall to deep_copy", K(src_cell), K(ret));
}
// To reuse prealloc memory, we must use specialize deep_copy method
// Otherwise for null value, we won't reserve its orgin ptr in ObDatum::deep_copy
memcpy(buf, src_cell.ptr_, src_cell.len_);
target_cell.ptr_ = buf;
target_cell.pack_ = src_cell.pack_;
}
OX(LOG_DEBUG("succ to clone cell", K(src_cell), K(target_cell), K(curr_size), K(need_size)));
return ret;

View File

@ -642,7 +642,7 @@ int64_t ObHashDistinct::ObHashDistinctCtx::get_total_based_mem_used()
int ObHashDistinct::ObHashDistinctCtx::open_cur_partition(ObChunkRowStore* row_store)
{
int ret = OB_SUCCESS;
int chunk_size = (NULL == ha_row_store_) ? row_store->get_file_size()
int64_t chunk_size = (NULL == ha_row_store_) ? row_store->get_file_size()
: (ha_row_store_->get_mem_limit() > OB_DEFAULT_MACRO_BLOCK_SIZE
? OB_DEFAULT_MACRO_BLOCK_SIZE
: ha_row_store_->get_mem_limit());

View File

@ -5760,19 +5760,32 @@ int ObLogicalOperator::inner_set_merge_sort(ObLogicalOperator* producer, ObLogic
if (OB_SUCC(ret) && 0 < sort->get_sort_keys().count()) {
// just use sort keys, avoid output_exprs are incorrect
consumer_exchange->set_is_merge_sort(true);
consumer_exchange->set_local_order(!global_order);
need_remove = true;
if (global_order && OB_FAIL(producer_exchange->set_op_ordering(sort->get_sort_keys()))) {
LOG_WARN("failed to set op ordering", K(ret));
} else if (!global_order && OB_FAIL(producer_exchange->set_local_ordering(sort->get_sort_keys()))) {
LOG_WARN("failed to set local ordering", K(ret));
} else if (OB_FAIL(consumer_exchange->set_sort_keys(sort->get_sort_keys()))) {
LOG_WARN("failed to set op ordering", K(ret));
} else {
LOG_TRACE("sort keys of exchange",
K(ret),
K(consumer_exchange->get_sort_keys()),
K(consumer_exchange->is_task_order()));
//for local merge sort in px_coord_merge_sort, we push down the sort operator
//keep the local order in sort op
if (!global_order && ObPQDistributeMethod::MAX_VALUE
== consumer_exchange->get_dist_method()) {
if (OB_FAIL(producer_exchange->allocate_sort_below(0, sort->get_sort_keys()))) {
LOG_WARN("failed to allocate sort", K(ret));
} else {
global_order = true;
static_cast<ObLogSort *>(producer_exchange->get_child(0))->set_local_merge_sort(true);
}
}
if (OB_SUCC(ret)) {
consumer_exchange->set_local_order(!global_order);
need_remove = true;
if (global_order && OB_FAIL(producer_exchange->set_op_ordering(sort->get_sort_keys()))) {
LOG_WARN("failed to set op ordering", K(ret));
} else if (!global_order && OB_FAIL(producer_exchange->set_local_ordering(sort->get_sort_keys()))) {
LOG_WARN("failed to set local ordering", K(ret));
} else if (OB_FAIL(consumer_exchange->set_sort_keys(sort->get_sort_keys()))) {
LOG_WARN("failed to set op ordering", K(ret));
} else {
LOG_TRACE("sort keys of exchange",
K(ret),
K(consumer_exchange->get_sort_keys()),
K(consumer_exchange->is_task_order()));
}
}
}
}