Fix expect may produce incorrect values (#3381)
This commit is contained in:
@ -39,43 +39,14 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status ExceptNode::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
// open result expr lists.
|
||||
for (const vector<ExprContext*>& exprs : _child_expr_lists) {
|
||||
RETURN_IF_ERROR(Expr::open(exprs, state));
|
||||
}
|
||||
// initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
|
||||
_hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
|
||||
true, _find_nulls, id(), mem_tracker(), 1024));
|
||||
RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
SCOPED_TIMER(_build_timer);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
|
||||
// take ownership of tuple data of build_batch
|
||||
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
|
||||
// build hash table and remove duplicate items
|
||||
for (int i = 0; i < build_batch.num_rows(); ++i) {
|
||||
VLOG_ROW << "build row: "
|
||||
<< get_row_output_string(build_batch.get_row(i), child(0)->row_desc());
|
||||
_hash_tbl->insert_unique(build_batch.get_row(i));
|
||||
}
|
||||
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
|
||||
build_batch.reset();
|
||||
}
|
||||
RETURN_IF_ERROR(SetOperationNode::open(state));
|
||||
// if a table is empty, the result must be empty
|
||||
|
||||
if (_hash_tbl->size() == 0) {
|
||||
_hash_tbl_iterator = _hash_tbl->begin();
|
||||
return Status::OK();
|
||||
}
|
||||
bool eos = false;
|
||||
|
||||
for (int i = 1; i < _children.size(); ++i) {
|
||||
// rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
|
||||
@ -116,6 +87,8 @@ Status ExceptNode::open(RuntimeState* state) {
|
||||
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
|
||||
if (_hash_tbl_iterator != _hash_tbl->end()) {
|
||||
_hash_tbl_iterator.set_matched();
|
||||
VLOG_ROW << "probe matched: "
|
||||
<< get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc());
|
||||
}
|
||||
}
|
||||
_probe_batch->reset();
|
||||
|
||||
@ -43,41 +43,13 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
|
||||
// repeat [2] this for all the rest child
|
||||
Status IntersectNode::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
// open result expr lists.
|
||||
for (const vector<ExprContext*>& exprs : _child_expr_lists) {
|
||||
RETURN_IF_ERROR(Expr::open(exprs, state));
|
||||
}
|
||||
|
||||
// initial build hash table
|
||||
_hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
|
||||
true, _find_nulls, id(), mem_tracker(), 1024));
|
||||
RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
SCOPED_TIMER(_build_timer);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
|
||||
// take ownership of tuple data of build_batch
|
||||
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect, while constructing the hash table.");
|
||||
for (int i = 0; i < build_batch.num_rows(); ++i) {
|
||||
VLOG_ROW << "build row: "
|
||||
<< get_row_output_string(build_batch.get_row(i), child(0)->row_desc());
|
||||
_hash_tbl->insert_unique(build_batch.get_row(i));
|
||||
}
|
||||
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
|
||||
build_batch.reset();
|
||||
}
|
||||
RETURN_IF_ERROR(SetOperationNode::open(state));
|
||||
// if a table is empty, the result must be empty
|
||||
if (_hash_tbl->size() == 0) {
|
||||
_hash_tbl_iterator = _hash_tbl->begin();
|
||||
return Status::OK();
|
||||
}
|
||||
bool eos = false;
|
||||
|
||||
for (int i = 1; i < _children.size(); ++i) {
|
||||
if (i > 1) {
|
||||
@ -119,6 +91,8 @@ Status IntersectNode::open(RuntimeState* state) {
|
||||
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
|
||||
if (_hash_tbl_iterator != _hash_tbl->end()) {
|
||||
_hash_tbl_iterator.set_matched();
|
||||
VLOG_ROW << "probe matched: "
|
||||
<< get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc());
|
||||
}
|
||||
}
|
||||
_probe_batch->reset();
|
||||
|
||||
@ -133,4 +133,39 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Status SetOperationNode::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
// open result expr lists.
|
||||
for (const vector<ExprContext*>& exprs : _child_expr_lists) {
|
||||
RETURN_IF_ERROR(Expr::open(exprs, state));
|
||||
}
|
||||
// initial build hash table used for remove duplicted
|
||||
_hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
|
||||
true, _find_nulls, id(), mem_tracker(), 1024));
|
||||
RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
|
||||
bool eos = false;
|
||||
while (!eos) {
|
||||
SCOPED_TIMER(_build_timer);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
|
||||
// take ownership of tuple data of build_batch
|
||||
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table.");
|
||||
// build hash table and remove duplicate items
|
||||
for (int i = 0; i < build_batch.num_rows(); ++i) {
|
||||
VLOG_ROW << "build row: "
|
||||
<< get_row_output_string(build_batch.get_row(i), child(0)->row_desc());
|
||||
_hash_tbl->insert_unique(build_batch.get_row(i));
|
||||
}
|
||||
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
|
||||
build_batch.reset();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
@ -41,6 +41,8 @@ public:
|
||||
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
virtual Status close(RuntimeState* state);
|
||||
virtual Status open(RuntimeState* state);
|
||||
|
||||
|
||||
protected:
|
||||
std::string get_row_output_string(TupleRow* row, const RowDescriptor& row_desc);
|
||||
|
||||
Reference in New Issue
Block a user