Fix output results may incorrect when using intersect and except statements (#3228)

output results may  incorrect  when using intersect and except statements
This commit is contained in:
yangzhg
2020-04-01 20:58:43 +08:00
committed by GitHub
parent 34993a69a8
commit 63cee94c5c
7 changed files with 256 additions and 204 deletions

View File

@ -24,13 +24,10 @@
namespace doris {
IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {}
: SetOperationNode(pool, tnode, descs, tnode.intersect_node.tuple_id) {}
Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
DCHECK(tnode.__isset.intersect_node);
DCHECK_EQ(_conjunct_ctxs.size(), 0);
DCHECK_GE(_children.size(), 2);
RETURN_IF_ERROR(SetOperationNode::init(tnode, state));
// Create result_expr_ctx_lists_ from thrift exprs.
auto& result_texpr_lists = tnode.intersect_node.result_expr_lists;
for (auto& texprs : result_texpr_lists) {
@ -41,54 +38,6 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}
Status IntersectNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
_build_pool.reset(new MemPool(mem_tracker()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
SCOPED_TIMER(_runtime_profile->total_time_counter());
for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
expr_mem_tracker()));
}
_build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
_build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
_build_tuple_idx.reserve(_build_tuple_size);
for (int i = 0; i < _build_tuple_size; ++i) {
TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
_build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
}
_find_nulls = std::vector<bool>(_build_tuple_size, true);
return Status::OK();
}
Status IntersectNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
for (auto& exprs : _child_expr_lists) {
Expr::close(exprs, state);
}
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
// Must reset _probe_batch in close() to release resources
_probe_batch.reset(NULL);
if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
}
if (_hash_tbl.get() != NULL) {
_hash_tbl->close();
}
if (_build_pool.get() != NULL) {
_build_pool->free_all();
}
return ExecNode::close(state);
}
// the actual intersect operation is in this function,
// 1 build a hash table from child(0)
// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
@ -117,6 +66,8 @@ Status IntersectNode::open(RuntimeState* state) {
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table.");
for (int i = 0; i < build_batch.num_rows(); ++i) {
VLOG_ROW << "build row: "
<< get_row_output_string(build_batch.get_row(i), child(0)->row_desc());
_hash_tbl->insert_unique(build_batch.get_row(i));
}
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
@ -137,6 +88,9 @@ Status IntersectNode::open(RuntimeState* state) {
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if (_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
}
_hash_tbl_iterator.next<false>();
@ -160,6 +114,8 @@ Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
VLOG_ROW << "probe row: "
<< get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
_hash_tbl_iterator.set_matched();
@ -180,16 +136,20 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
if (reached_limit()) {
return Status::OK();
}
int64_t tuple_buf_size;
uint8_t* tuple_buf;
RETURN_IF_ERROR(
out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (_hash_tbl_iterator.has_next()) {
VLOG_ROW << "find row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc())
<< " matched: " << _hash_tbl_iterator.matched();
if (_hash_tbl_iterator.matched()) {
int row_idx = out_batch->add_row();
TupleRow* out_row = out_batch->get_row(row_idx);
uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
out_batch->commit_last_row();
create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf);
tuple_buf += _tuple_desc->byte_size();
++_num_rows_returned;
}
_hash_tbl_iterator.next<false>();
*eos = !_hash_tbl_iterator.has_next() || reached_limit();
if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
@ -198,5 +158,4 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
}
return Status::OK();
}
} // namespace doris