[fix] Fix the query result error caused by the grouping sets statemen… (#11316)
* [fix] Fix the query result error caused by the grouping sets statement grouping as an expression
This commit is contained in:
@ -34,7 +34,7 @@ RepeatNode::RepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descripto
|
||||
_repeat_id_list(tnode.repeat_node.repeat_id_list),
|
||||
_grouping_list(tnode.repeat_node.grouping_list),
|
||||
_output_tuple_id(tnode.repeat_node.output_tuple_id),
|
||||
_tuple_desc(nullptr),
|
||||
_output_tuple_desc(nullptr),
|
||||
_child_row_batch(nullptr),
|
||||
_child_eos(false),
|
||||
_repeat_id_idx(0),
|
||||
@ -42,16 +42,30 @@ RepeatNode::RepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descripto
|
||||
|
||||
RepeatNode::~RepeatNode() {}
|
||||
|
||||
Status RepeatNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
const RowDescriptor& row_desc = child(0)->row_desc();
|
||||
RETURN_IF_ERROR(Expr::create(tnode.repeat_node.exprs, row_desc, state, &_exprs));
|
||||
DCHECK(!_exprs.empty());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RepeatNode::prepare(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
_runtime_state = state;
|
||||
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
|
||||
if (_tuple_desc == nullptr) {
|
||||
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
|
||||
if (_output_tuple_desc == nullptr) {
|
||||
return Status::InternalError("Failed to get tuple descriptor.");
|
||||
}
|
||||
|
||||
for (int i = 0; i < _exprs.size(); i++) {
|
||||
ExprContext* context = _pool->add(new ExprContext(_exprs[i]));
|
||||
RETURN_IF_ERROR(context->prepare(state, child(0)->row_desc()));
|
||||
_expr_evals.push_back(context);
|
||||
}
|
||||
DCHECK_EQ(_exprs.size(), _expr_evals.size());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -59,6 +73,10 @@ Status RepeatNode::open(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
|
||||
for (int i = 0; i < _expr_evals.size(); i++) {
|
||||
RETURN_IF_ERROR(_expr_evals[i]->open(state));
|
||||
}
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
return Status::OK();
|
||||
@ -77,66 +95,14 @@ Status RepeatNode::get_repeated_batch(RowBatch* child_row_batch, int repeat_id_i
|
||||
|
||||
// Fill all slots according to child
|
||||
MemPool* tuple_pool = row_batch->tuple_data_pool();
|
||||
const std::vector<TupleDescriptor*>& src_tuple_descs =
|
||||
child_row_batch->row_desc().tuple_descriptors();
|
||||
const std::vector<TupleDescriptor*>& dst_tuple_descs =
|
||||
row_batch->row_desc().tuple_descriptors();
|
||||
std::vector<Tuple*> dst_tuples(src_tuple_descs.size(), nullptr);
|
||||
for (int i = 0; i < child_row_batch->num_rows(); ++i) {
|
||||
Tuple* tuple = nullptr;
|
||||
for (int row_index = 0; row_index < child_row_batch->num_rows(); ++row_index) {
|
||||
int row_idx = row_batch->add_row();
|
||||
TupleRow* dst_row = row_batch->get_row(row_idx);
|
||||
TupleRow* src_row = child_row_batch->get_row(i);
|
||||
TupleRow* src_row = child_row_batch->get_row(row_index);
|
||||
|
||||
auto src_it = src_tuple_descs.begin();
|
||||
auto dst_it = dst_tuple_descs.begin();
|
||||
for (int j = 0; src_it != src_tuple_descs.end() && dst_it != dst_tuple_descs.end();
|
||||
++src_it, ++dst_it, ++j) {
|
||||
Tuple* src_tuple = src_row->get_tuple(j);
|
||||
if (src_tuple == nullptr) {
|
||||
dst_row->set_tuple(j, nullptr);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dst_tuples[j] == nullptr) {
|
||||
int size = row_batch->capacity() * (*dst_it)->byte_size();
|
||||
void* tuple_buffer = tuple_pool->allocate(size);
|
||||
if (tuple_buffer == nullptr) {
|
||||
return Status::InternalError("Allocate memory for row batch failed.");
|
||||
}
|
||||
dst_tuples[j] = reinterpret_cast<Tuple*>(tuple_buffer);
|
||||
} else {
|
||||
char* new_tuple = reinterpret_cast<char*>(dst_tuples[j]);
|
||||
new_tuple += (*dst_it)->byte_size();
|
||||
dst_tuples[j] = reinterpret_cast<Tuple*>(new_tuple);
|
||||
}
|
||||
dst_row->set_tuple(j, dst_tuples[j]);
|
||||
memset(dst_tuples[j], 0, (*dst_it)->num_null_bytes());
|
||||
src_tuple->deep_copy(dst_tuples[j], **dst_it, tuple_pool);
|
||||
for (int k = 0; k < (*src_it)->slots().size(); k++) {
|
||||
SlotDescriptor* src_slot_desc = (*src_it)->slots()[k];
|
||||
SlotDescriptor* dst_slot_desc = (*dst_it)->slots()[k];
|
||||
DCHECK_EQ(src_slot_desc->type().type, dst_slot_desc->type().type);
|
||||
DCHECK_EQ(src_slot_desc->col_name(), dst_slot_desc->col_name());
|
||||
// set null base on repeated list
|
||||
if (_all_slot_ids.find(src_slot_desc->id()) != _all_slot_ids.end()) {
|
||||
std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx];
|
||||
if (repeat_ids.find(src_slot_desc->id()) == repeat_ids.end()) {
|
||||
dst_tuples[j]->set_null(dst_slot_desc->null_indicator_offset());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
row_batch->commit_last_row();
|
||||
}
|
||||
Tuple* tuple = nullptr;
|
||||
// Fill grouping ID to tuple
|
||||
for (int i = 0; i < child_row_batch->num_rows(); ++i) {
|
||||
int row_idx = i;
|
||||
TupleRow* row = row_batch->get_row(row_idx);
|
||||
|
||||
if (tuple == nullptr) {
|
||||
int size = row_batch->capacity() * _tuple_desc->byte_size();
|
||||
if (UNLIKELY(tuple == nullptr)) {
|
||||
int size = row_batch->capacity() * _output_tuple_desc->byte_size();
|
||||
void* tuple_buffer = tuple_pool->allocate(size);
|
||||
if (tuple_buffer == nullptr) {
|
||||
return Status::InternalError("Allocate memory for row batch failed.");
|
||||
@ -144,21 +110,38 @@ Status RepeatNode::get_repeated_batch(RowBatch* child_row_batch, int repeat_id_i
|
||||
tuple = reinterpret_cast<Tuple*>(tuple_buffer);
|
||||
} else {
|
||||
char* new_tuple = reinterpret_cast<char*>(tuple);
|
||||
new_tuple += _tuple_desc->byte_size();
|
||||
new_tuple += _output_tuple_desc->byte_size();
|
||||
tuple = reinterpret_cast<Tuple*>(new_tuple);
|
||||
}
|
||||
dst_row->set_tuple(0, tuple);
|
||||
memset(tuple, 0, _output_tuple_desc->num_null_bytes());
|
||||
|
||||
row->set_tuple(src_tuple_descs.size(), tuple);
|
||||
memset(tuple, 0, _tuple_desc->num_null_bytes());
|
||||
int slot_index = 0;
|
||||
for (; slot_index < _expr_evals.size(); ++slot_index) {
|
||||
const SlotDescriptor* slot_desc = _output_tuple_desc->slots()[slot_index];
|
||||
// set null base on repeated list
|
||||
if (_all_slot_ids.find(slot_desc->id()) != _all_slot_ids.end()) {
|
||||
std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx];
|
||||
if (repeat_ids.find(slot_desc->id()) == repeat_ids.end()) {
|
||||
tuple->set_null(slot_desc->null_indicator_offset());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t slot_idx = 0; slot_idx < _grouping_list.size(); ++slot_idx) {
|
||||
int64_t val = _grouping_list[slot_idx][repeat_id_idx];
|
||||
DCHECK_LT(slot_idx, _tuple_desc->slots().size())
|
||||
<< "TupleDescriptor: " << _tuple_desc->debug_string();
|
||||
const SlotDescriptor* slot_desc = _tuple_desc->slots()[slot_idx];
|
||||
void* val = _expr_evals[slot_index]->get_value(src_row);
|
||||
tuple->set_not_null(slot_desc->null_indicator_offset());
|
||||
RawValue::write(val, tuple, slot_desc, tuple_pool);
|
||||
}
|
||||
|
||||
DCHECK_EQ(slot_index + _grouping_list.size(), _output_tuple_desc->slots().size());
|
||||
for (int i = 0; slot_index < _output_tuple_desc->slots().size(); ++i, ++slot_index) {
|
||||
const SlotDescriptor* slot_desc = _output_tuple_desc->slots()[slot_index];
|
||||
tuple->set_not_null(slot_desc->null_indicator_offset());
|
||||
|
||||
int64_t val = _grouping_list[i][repeat_id_idx];
|
||||
RawValue::write(&val, tuple, slot_desc, tuple_pool);
|
||||
}
|
||||
row_batch->commit_last_row();
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -207,6 +190,11 @@ Status RepeatNode::close(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
_child_row_batch.reset(nullptr);
|
||||
for (int i = 0; i < _expr_evals.size(); i++) {
|
||||
_expr_evals[i]->close(state);
|
||||
}
|
||||
_expr_evals.clear();
|
||||
Expr::close(_exprs);
|
||||
RETURN_IF_ERROR(child(0)->close(state));
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
@ -216,6 +204,7 @@ void RepeatNode::debug_string(int indentation_level, std::stringstream* out) con
|
||||
*out << "RepeatNode(";
|
||||
*out << "repeat pattern: [" << JoinElements(_repeat_id_list, ",") << "]\n";
|
||||
*out << "add " << _grouping_list.size() << " columns. \n";
|
||||
*out << "_exprs: " << Expr::debug_string(_exprs);
|
||||
*out << "added column values: ";
|
||||
for (const std::vector<int64_t>& v : _grouping_list) {
|
||||
*out << "[" << JoinElements(v, ",") << "] ";
|
||||
|
||||
Reference in New Issue
Block a user