[fix] fix hash table insert() may be failed but not handle this error (#8207)

This commit is contained in:
Zhengguo Yang
2022-03-03 22:33:05 +08:00
committed by GitHub
parent c56a372e06
commit e7c417505c
9 changed files with 56 additions and 52 deletions

View File

@ -50,7 +50,9 @@ Status ExceptNode::open(RuntimeState* state) {
for (int i = 1; i < _children.size(); ++i) {
// rebuild hash table, for first time will rebuild with the no duplicated _hash_tbl,
if (i > 1) { refresh_hash_table<false>(i); }
if (i > 1) {
RETURN_IF_ERROR(refresh_hash_table<false>(i));
}
// probe
_probe_batch.reset(
@ -63,17 +65,12 @@ Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
VLOG_ROW << "probe row: "
<< get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
if (!_hash_tbl_iterator.matched()) {
_hash_tbl_iterator.set_matched();
_valid_element_in_hash_tbl--;
}
VLOG_ROW << "probe matched: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
}
}
_probe_batch->reset();
@ -101,9 +98,6 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos)
out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (_hash_tbl_iterator.has_next()) {
VLOG_ROW << "find row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc())
<< " matched: " << _hash_tbl_iterator.matched();
if (!_hash_tbl_iterator.matched()) {
create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf);
tuple_buf += _tuple_desc->byte_size();

View File

@ -144,7 +144,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
(std::find(_is_null_safe_eq_join.begin(), _is_null_safe_eq_join.end(), true) !=
_is_null_safe_eq_join.end());
_hash_tbl.reset(new HashTable(_build_expr_ctxs, _probe_expr_ctxs, _build_tuple_size,
stores_nulls, _is_null_safe_eq_join, id(), mem_tracker(), 1024));
stores_nulls, _is_null_safe_eq_join, id(), mem_tracker(),
state->batch_size() * 2));
_probe_batch.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()));
@ -762,7 +763,6 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba
// insert build row into our hash table
if (_build_unique) {
for (int i = 0; i < build_batch->num_rows(); ++i) {
// _hash_tbl->insert_unique(build_batch->get_row(i));
TupleRow* tuple_row = nullptr;
if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) {
build_batch->get_row(i)->deep_copy(tuple_row,
@ -775,9 +775,9 @@ Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_ba
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch->tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
for (int i = 0; i < build_batch->num_rows(); ++i) {
_hash_tbl->insert(build_batch->get_row(i));
_hash_tbl->insert_without_check(build_batch->get_row(i));
}
}
return Status::OK();

View File

@ -35,8 +35,6 @@ class TupleRow;
class MemTracker;
class RuntimeState;
using std::vector;
// Hash table implementation designed for hash aggregation and hash joins. This is not
// templatized and is tailored to the usage pattern for aggregation and joins. The
// hash table store TupleRows and allows for different exprs for insertions and finds.
@ -101,20 +99,40 @@ public:
// Insert row into the hash table. Row will be evaluated over _build_expr_ctxs
// This will grow the hash table if necessary
void insert(TupleRow* row) {
Status insert(TupleRow* row) {
if (_num_filled_buckets > _num_buckets_till_resize) {
// TODO: next prime instead of double?
resize_buckets(_num_buckets * 2);
RETURN_IF_ERROR(resize_buckets(_num_buckets * 2));
}
insert_impl(row);
return Status::OK();
}
void insert_without_check(TupleRow* row) { insert_impl(row); }
// Insert row into the hash table. if the row is already exist will not insert
void insert_unique(TupleRow* row) {
Status insert_unique(TupleRow* row) {
if (find(row, false) == end()) {
insert(row);
return insert(row);
}
return Status::OK();
}
void insert_unique_without_check(TupleRow* row) {
if (find(row, false) == end()) {
insert_without_check(row);
}
}
Status resize_buckets_ahead(int64_t estimate_buckets) {
if (_num_filled_buckets + estimate_buckets > _num_buckets_till_resize) {
int64_t new_bucket_size = _num_buckets * 2;
while (new_bucket_size <= _num_filled_buckets + estimate_buckets) {
new_bucket_size = new_bucket_size * 2;
}
return resize_buckets(new_bucket_size);
}
return Status::OK();
}
bool emplace_key(TupleRow* row, TupleRow** key_addr);

View File

@ -24,7 +24,9 @@ namespace doris {
inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) {
if (_num_filled_buckets > _num_buckets_till_resize) {
resize_buckets(_num_buckets * 2);
if (!resize_buckets(_num_buckets * 2).ok()) {
return false;
}
}
if (_current_used == _current_capacity) {
grow_node_array();

View File

@ -52,7 +52,9 @@ Status IntersectNode::open(RuntimeState* state) {
bool eos = false;
for (int i = 1; i < _children.size(); ++i) {
if (i > 1) { refresh_hash_table<true>(i); }
if (i > 1) {
RETURN_IF_ERROR(refresh_hash_table<true>(i));
}
_valid_element_in_hash_tbl = 0;
// probe
@ -66,17 +68,12 @@ Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
VLOG_ROW << "probe row: "
<< get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
if (!_hash_tbl_iterator.matched()) {
_valid_element_in_hash_tbl++;
_hash_tbl_iterator.set_matched();
}
VLOG_ROW << "probe matched: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
}
}
_probe_batch->reset();
@ -100,9 +97,6 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
out_batch->resize_and_allocate_tuple_buffer(state, &tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (_hash_tbl_iterator.has_next()) {
VLOG_ROW << "find row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(), child(0)->row_desc())
<< " matched: " << _hash_tbl_iterator.matched();
if (_hash_tbl_iterator.matched()) {
create_output_row(_hash_tbl_iterator.get_row(), out_batch, tuple_buf);
tuple_buf += _tuple_desc->byte_size();

View File

@ -27,7 +27,10 @@
namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, int tuple_id)
: ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {}
: ExecNode(pool, tnode, descs),
_tuple_id(tuple_id),
_tuple_desc(nullptr),
_valid_element_in_hash_tbl(0) {}
Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
@ -142,7 +145,7 @@ Status SetOperationNode::open(RuntimeState* state) {
}
// initial build hash table used for remove duplicated
_hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
true, _find_nulls, id(), mem_tracker(), 1024));
true, _find_nulls, id(), mem_tracker(), state->batch_size() * 2));
RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker().get());
RETURN_IF_ERROR(child(0)->open(state));
@ -155,10 +158,9 @@ Status SetOperationNode::open(RuntimeState* state) {
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the hash table.");
// build hash table and remove duplicate items
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
for (int i = 0; i < build_batch.num_rows(); ++i) {
VLOG_ROW << "build row: "
<< get_row_output_string(build_batch.get_row(i), child(0)->row_desc());
_hash_tbl->insert_unique(build_batch.get_row(i));
_hash_tbl->insert_unique_without_check(build_batch.get_row(i));
}
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
build_batch.reset();

View File

@ -54,7 +54,7 @@ protected:
// TODO: Check whether the hash table should be shrink to reduce necessary refresh
// but may different child has different probe expr which may cause wrong result.
// so we need keep probe expr same in FE to optimize this issue.
void refresh_hash_table(int child);
Status refresh_hash_table(int child);
/// Tuple id resolved in Prepare() to set tuple_desc_;
const int _tuple_id;
@ -81,33 +81,28 @@ protected:
};
template <bool keep_matched>
void SetOperationNode::refresh_hash_table(int child_id) {
Status SetOperationNode::refresh_hash_table(int child_id) {
SCOPED_TIMER(_build_timer);
std::unique_ptr<HashTable> temp_tbl(
new HashTable(_child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size,
true, _find_nulls, id(), mem_tracker(),
_valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
std::unique_ptr<HashTable> temp_tbl(new HashTable(
_child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size, true, _find_nulls,
id(), mem_tracker(),
_valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if constexpr (keep_matched) {
if (_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
}
} else {
if (!_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
}
}
_hash_tbl_iterator.next<false>();
}
_hash_tbl.swap(temp_tbl);
temp_tbl->close();
return Status::OK();
}
}; // namespace doris
}; // namespace doris

View File

@ -143,7 +143,6 @@ TEST_F(EsPredicateTest, normal) {
std::vector<ExprContext*> conjunct_ctxs;
Status status = build_expr_context_list(conjunct_ctxs);
ASSERT_TRUE(status.ok());
TupleDescriptor* tuple_desc = _desc_tbl->get_tuple_descriptor(0);
std::vector<EsPredicate*> predicates;
for (int i = 0; i < conjunct_ctxs.size(); ++i) {

View File

@ -30,8 +30,8 @@
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "util/debug_util.h"
#include "test_util/test_util.h"
#include "util/debug_util.h"
namespace doris {
namespace segment_v2 {