[Optimize][Set Operation Node] Reduce the memory expansion operation of the hash table in ExceptNode and IntersectNode (#6915)

Reduce the memory expansion operation of the hash table in ExceptNode and IntersectNode
This commit is contained in:
HappenLee
2021-11-12 10:39:59 +08:00
committed by GitHub
parent 35da149ebe
commit 047b83b987
6 changed files with 68 additions and 52 deletions

View File

@ -41,33 +41,17 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
return Status::OK();
}
bool eos = false;
_valid_element_in_hash_tbl = _hash_tbl->num_filled_buckets();
for (int i = 1; i < _children.size(); ++i) {
// rebuild hash table, for first time will rebuild with the no duplicated _hash_tbl,
if (i > 1) {
SCOPED_TIMER(_build_timer);
std::unique_ptr<HashTable> temp_tbl(
new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
true, _find_nulls, id(), mem_tracker(), 1024));
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if (!_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
}
_hash_tbl_iterator.next<false>();
}
_hash_tbl.swap(temp_tbl);
temp_tbl->close();
}
if (i > 1) { refresh_hash_table<false>(i); }
// probe
_probe_batch.reset(
new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker().get()));
@ -83,7 +67,10 @@ Status ExceptNode::open(RuntimeState* state) {
<< get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
_hash_tbl_iterator.set_matched();
if (!_hash_tbl_iterator.matched()) {
_hash_tbl_iterator.set_matched();
_valid_element_in_hash_tbl--;
}
VLOG_ROW << "probe matched: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());

View File

@ -27,8 +27,6 @@
namespace doris {
const float HashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f;
HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
const std::vector<ExprContext*>& probe_expr_ctxs, int num_build_tuples,
bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed,

View File

@ -137,6 +137,14 @@ public:
// Returns the number of buckets
int64_t num_buckets() { return _buckets.size(); }
// Returns the number of filled buckets
int64_t num_filled_buckets() { return _num_filled_buckets; }
// Check the hash table should be shrink
bool should_be_shrink(int64_t valid_row) {
return valid_row < MAX_BUCKET_OCCUPANCY_FRACTION * (_buckets.size() / 2.0);
}
// true if any of the MemTrackers was exceeded
bool exceeded_limit() const { return _exceeded_limit; }
@ -174,6 +182,10 @@ public:
inline std::pair<int64_t, int64_t> minmax_node();
// Load factor that will trigger growing the hash table on insert. This is
// defined as the number of non-empty buckets / total_buckets
static constexpr float MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f;
// stl-like iterator interface.
class Iterator {
public:
@ -348,10 +360,6 @@ private:
// brought us over the mem limit.
void mem_limit_exceeded(int64_t allocation_size);
// Load factor that will trigger growing the hash table on insert. This is
// defined as the number of non-empty buckets / total_buckets
static const float MAX_BUCKET_OCCUPANCY_FRACTION;
const std::vector<ExprContext*>& _build_expr_ctxs;
const std::vector<ExprContext*>& _probe_expr_ctxs;

View File

@ -52,30 +52,9 @@ Status IntersectNode::open(RuntimeState* state) {
bool eos = false;
for (int i = 1; i < _children.size(); ++i) {
if (i > 1) {
SCOPED_TIMER(_build_timer);
std::unique_ptr<HashTable> temp_tbl(
new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
true, _find_nulls, id(), mem_tracker(), 1024));
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if (_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
}
_hash_tbl_iterator.next<false>();
}
_hash_tbl.swap(temp_tbl);
temp_tbl->close();
VLOG_ROW << "hash table content: "
<< _hash_tbl->debug_string(true, &child(0)->row_desc());
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
break;
}
}
if (i > 1) { refresh_hash_table<true>(i); }
_valid_element_in_hash_tbl = 0;
// probe
_probe_batch.reset(
new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker().get()));
@ -91,7 +70,10 @@ Status IntersectNode::open(RuntimeState* state) {
<< get_row_output_string(_probe_batch->get_row(j), child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
_hash_tbl_iterator.set_matched();
if (!_hash_tbl_iterator.matched()) {
_valid_element_in_hash_tbl++;
_hash_tbl_iterator.set_matched();
}
VLOG_ROW << "probe matched: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());

View File

@ -27,7 +27,7 @@
namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, int tuple_id)
: ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr) {}
: ExecNode(pool, tnode, descs), _tuple_id(tuple_id), _tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {}
Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
@ -163,6 +163,8 @@ Status SetOperationNode::open(RuntimeState* state) {
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
build_batch.reset();
}
return Status::OK();
}
} // namespace doris

View File

@ -49,6 +49,13 @@ protected:
// Returns true if the values of row and other are equal
bool equals(TupleRow* row, TupleRow* other);
template <bool keep_matched>
// Refresh the hash table and probe expr, before we dispose data of next child
// TODO: Check whether the hash table should be shrink to reduce necessary refresh
// but may different child has different probe expr which may cause wrong result.
// so we need keep probe expr same in FE to optimize this issue.
void refresh_hash_table(int child);
/// Tuple id resolved in Prepare() to set tuple_desc_;
const int _tuple_id;
/// Descriptor for tuples this union node constructs.
@ -58,6 +65,8 @@ protected:
std::unique_ptr<HashTable> _hash_tbl;
HashTable::Iterator _hash_tbl_iterator;
int64_t _valid_element_in_hash_tbl;
std::unique_ptr<RowBatch> _probe_batch;
// holds everything referenced in _hash_tbl
std::unique_ptr<MemPool> _build_pool;
@ -71,4 +80,34 @@ protected:
RuntimeProfile::Counter* _probe_timer; // time to probe
};
template <bool keep_matched>
void SetOperationNode::refresh_hash_table(int child_id) {
SCOPED_TIMER(_build_timer);
std::unique_ptr<HashTable> temp_tbl(
new HashTable(_child_expr_lists[0], _child_expr_lists[child_id], _build_tuple_size,
true, _find_nulls, id(), mem_tracker(),
_valid_element_in_hash_tbl / HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if constexpr (keep_matched) {
if (_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
}
} else {
if (!_hash_tbl_iterator.matched()) {
VLOG_ROW << "rebuild row: "
<< get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc());
temp_tbl->insert(_hash_tbl_iterator.get_row());
}
}
_hash_tbl_iterator.next<false>();
}
_hash_tbl.swap(temp_tbl);
temp_tbl->close();
}
}; // namespace doris