[optimization] avoid extra memory copy while build hash table (#5301)
avoid extra memory copy while build hash table
This commit is contained in:
@ -40,8 +40,11 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
|
||||
_initial_seed(initial_seed),
|
||||
_node_byte_size(sizeof(Node) + sizeof(Tuple*) * _num_build_tuples),
|
||||
_num_filled_buckets(0),
|
||||
_nodes(NULL),
|
||||
_current_nodes(nullptr),
|
||||
_num_nodes(0),
|
||||
_current_capacity(num_buckets),
|
||||
_current_used(0),
|
||||
_total_capacity(num_buckets),
|
||||
_exceeded_limit(false),
|
||||
_mem_tracker(mem_tracker),
|
||||
_mem_limit_exceeded(false) {
|
||||
@ -61,13 +64,15 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
|
||||
memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size);
|
||||
_expr_value_null_bits = new uint8_t[_build_expr_ctxs.size()];
|
||||
|
||||
_nodes_capacity = 1024;
|
||||
_nodes = reinterpret_cast<uint8_t*>(malloc(_nodes_capacity * _node_byte_size));
|
||||
memset(_nodes, 0, _nodes_capacity * _node_byte_size);
|
||||
_alloc_list.reserve(10);
|
||||
_current_nodes = reinterpret_cast<uint8_t*>(malloc(_current_capacity * _node_byte_size));
|
||||
// TODO: remove memset later
|
||||
memset(_current_nodes, 0, _current_capacity * _node_byte_size);
|
||||
_alloc_list.push_back(_current_nodes);
|
||||
|
||||
_mem_tracker->Consume(_nodes_capacity * _node_byte_size);
|
||||
_mem_tracker->Consume(_current_capacity * _node_byte_size);
|
||||
if (_mem_tracker->limit_exceeded()) {
|
||||
mem_limit_exceeded(_nodes_capacity * _node_byte_size);
|
||||
mem_limit_exceeded(_current_capacity * _node_byte_size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,8 +82,10 @@ void HashTable::close() {
|
||||
// TODO: use tr1::array?
|
||||
delete[] _expr_values_buffer;
|
||||
delete[] _expr_value_null_bits;
|
||||
free(_nodes);
|
||||
_mem_tracker->Release(_nodes_capacity * _node_byte_size);
|
||||
for (auto ptr : _alloc_list) {
|
||||
free(ptr);
|
||||
}
|
||||
_mem_tracker->Release(_total_capacity * _node_byte_size);
|
||||
_mem_tracker->Release(_buckets.size() * sizeof(Bucket));
|
||||
}
|
||||
|
||||
@ -199,11 +206,10 @@ void HashTable::resize_buckets(int64_t num_buckets) {
|
||||
Bucket* bucket = &_buckets[i];
|
||||
Bucket* sister_bucket = &_buckets[i + old_num_buckets];
|
||||
Node* last_node = NULL;
|
||||
int node_idx = bucket->_node_idx;
|
||||
Node* node = bucket->_node;
|
||||
|
||||
while (node_idx != -1) {
|
||||
Node* node = get_node(node_idx);
|
||||
int64_t next_idx = node->_next_idx;
|
||||
while (node != nullptr) {
|
||||
Node* next_node = node->_next;
|
||||
uint32_t hash = node->_hash;
|
||||
|
||||
bool node_must_move = true;
|
||||
@ -219,12 +225,12 @@ void HashTable::resize_buckets(int64_t num_buckets) {
|
||||
}
|
||||
|
||||
if (node_must_move) {
|
||||
move_node(bucket, move_to, node_idx, node, last_node);
|
||||
move_node(bucket, move_to, node, last_node);
|
||||
} else {
|
||||
last_node = node;
|
||||
}
|
||||
|
||||
node_idx = next_idx;
|
||||
node = next_node;
|
||||
}
|
||||
}
|
||||
|
||||
@ -233,19 +239,19 @@ void HashTable::resize_buckets(int64_t num_buckets) {
|
||||
}
|
||||
|
||||
void HashTable::grow_node_array() {
|
||||
int64_t old_size = _nodes_capacity * _node_byte_size;
|
||||
_nodes_capacity = _nodes_capacity + _nodes_capacity / 2;
|
||||
int64_t new_size = _nodes_capacity * _node_byte_size;
|
||||
_current_capacity = _total_capacity / 2;
|
||||
_total_capacity += _current_capacity;
|
||||
int64_t alloc_size = _current_capacity * _node_byte_size;
|
||||
_current_nodes = reinterpret_cast<uint8_t*>(malloc(alloc_size));
|
||||
_current_used = 0;
|
||||
// TODO: remove memset later
|
||||
memset(_current_nodes, 0, alloc_size);
|
||||
// add _current_nodes to alloc pool
|
||||
_alloc_list.push_back(_current_nodes);
|
||||
|
||||
uint8_t* new_nodes = reinterpret_cast<uint8_t*>(malloc(new_size));
|
||||
memset(new_nodes, 0, new_size);
|
||||
memcpy(new_nodes, _nodes, old_size);
|
||||
free(_nodes);
|
||||
_nodes = new_nodes;
|
||||
|
||||
_mem_tracker->Consume(new_size - old_size);
|
||||
_mem_tracker->Consume(alloc_size);
|
||||
if (_mem_tracker->limit_exceeded()) {
|
||||
mem_limit_exceeded(new_size - old_size);
|
||||
mem_limit_exceeded(alloc_size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -262,29 +268,27 @@ std::string HashTable::debug_string(bool skip_empty, const RowDescriptor* desc)
|
||||
ss << std::endl;
|
||||
|
||||
for (int i = 0; i < _buckets.size(); ++i) {
|
||||
int64_t node_idx = _buckets[i]._node_idx;
|
||||
Node* node = _buckets[i]._node;
|
||||
bool first = true;
|
||||
|
||||
if (skip_empty && node_idx == -1) {
|
||||
if (skip_empty && node == nullptr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ss << i << ": ";
|
||||
|
||||
while (node_idx != -1) {
|
||||
Node* node = get_node(node_idx);
|
||||
|
||||
while (node != nullptr) {
|
||||
if (!first) {
|
||||
ss << ",";
|
||||
}
|
||||
|
||||
if (desc == NULL) {
|
||||
ss << node_idx << "(" << (void*)node->data() << ")";
|
||||
ss << node->_hash << "(" << (void*)node->data() << ")";
|
||||
} else {
|
||||
ss << (void*)node->data() << " " << node->data()->to_string(*desc);
|
||||
}
|
||||
|
||||
node_idx = node->_next_idx;
|
||||
node = node->_next;
|
||||
first = false;
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
|
||||
#include "codegen/doris_ir.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/object_pool.h"
|
||||
#include "util/hash_util.hpp"
|
||||
|
||||
namespace doris {
|
||||
@ -142,7 +143,7 @@ public:
|
||||
|
||||
// Returns the number of bytes allocated to the hash table
|
||||
int64_t byte_size() const {
|
||||
return _node_byte_size * _nodes_capacity + sizeof(Bucket) * _buckets.size();
|
||||
return _node_byte_size * _total_capacity + sizeof(Bucket) * _buckets.size();
|
||||
}
|
||||
|
||||
// Returns the results of the exprs at 'expr_idx' evaluated over the last row
|
||||
@ -172,7 +173,7 @@ public:
|
||||
// stl-like iterator interface.
|
||||
class Iterator {
|
||||
public:
|
||||
Iterator() : _table(NULL), _bucket_idx(-1), _node_idx(-1) {}
|
||||
Iterator() : _table(NULL), _bucket_idx(-1), _node(nullptr) {}
|
||||
|
||||
// Iterates to the next element. In the case where the iterator was
|
||||
// from a Find, this will lazily evaluate that bucket, only returning
|
||||
@ -182,54 +183,52 @@ public:
|
||||
|
||||
// Returns the current row or NULL if at end.
|
||||
TupleRow* get_row() {
|
||||
if (_node_idx == -1) {
|
||||
if (_node == nullptr) {
|
||||
return NULL;
|
||||
}
|
||||
return _table->get_node(_node_idx)->data();
|
||||
return _node->data();
|
||||
}
|
||||
|
||||
// Returns Hash
|
||||
uint32_t get_hash() { return _table->get_node(_node_idx)->_hash; }
|
||||
uint32_t get_hash() { return _node->_hash; }
|
||||
|
||||
// Returns if the iterator is at the end
|
||||
bool has_next() { return _node_idx != -1; }
|
||||
bool has_next() { return _node != nullptr; }
|
||||
|
||||
// Returns true if this iterator is at the end, i.e. get_row() cannot be called.
|
||||
bool at_end() { return _node_idx == -1; }
|
||||
bool at_end() { return _node == nullptr; }
|
||||
|
||||
// Sets as matched the node currently pointed by the iterator. The iterator
|
||||
// cannot be AtEnd().
|
||||
void set_matched() {
|
||||
DCHECK(!at_end());
|
||||
Node* node = _table->get_node(_node_idx);
|
||||
node->matched = true;
|
||||
_node->matched = true;
|
||||
}
|
||||
|
||||
bool matched() {
|
||||
DCHECK(!at_end());
|
||||
Node* node = _table->get_node(_node_idx);
|
||||
return node->matched;
|
||||
return _node->matched;
|
||||
}
|
||||
|
||||
bool operator==(const Iterator& rhs) {
|
||||
return _bucket_idx == rhs._bucket_idx && _node_idx == rhs._node_idx;
|
||||
return _bucket_idx == rhs._bucket_idx && _node == rhs._node;
|
||||
}
|
||||
|
||||
bool operator!=(const Iterator& rhs) {
|
||||
return _bucket_idx != rhs._bucket_idx || _node_idx != rhs._node_idx;
|
||||
return _bucket_idx != rhs._bucket_idx || _node != rhs._node;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class HashTable;
|
||||
|
||||
Iterator(HashTable* table, int bucket_idx, int64_t node, uint32_t hash)
|
||||
: _table(table), _bucket_idx(bucket_idx), _node_idx(node), _scan_hash(hash) {}
|
||||
Iterator(HashTable* table, int bucket_idx, Node* node, uint32_t hash)
|
||||
: _table(table), _bucket_idx(bucket_idx), _node(node), _scan_hash(hash) {}
|
||||
|
||||
HashTable* _table;
|
||||
// Current bucket idx
|
||||
int64_t _bucket_idx;
|
||||
// Current node idx (within current bucket)
|
||||
int64_t _node_idx;
|
||||
// Current node (within current bucket)
|
||||
Node* _node;
|
||||
// cached hash value for the row passed to find()()
|
||||
uint32_t _scan_hash;
|
||||
};
|
||||
@ -241,11 +240,11 @@ private:
|
||||
// Header portion of a Node. The node data (TupleRow) is right after the
|
||||
// node memory to maximize cache hits.
|
||||
struct Node {
|
||||
int64_t _next_idx; // chain to next node for collisions
|
||||
uint32_t _hash; // Cache of the hash for _data
|
||||
Node* _next; // chain to next node for collisions
|
||||
uint32_t _hash; // Cache of the hash for _data
|
||||
bool matched;
|
||||
|
||||
Node() : _next_idx(-1), _hash(-1), matched(false) {}
|
||||
Node() : _next(nullptr), _hash(-1), matched(false) {}
|
||||
|
||||
TupleRow* data() {
|
||||
uint8_t* mem = reinterpret_cast<uint8_t*>(this);
|
||||
@ -255,22 +254,14 @@ private:
|
||||
};
|
||||
|
||||
struct Bucket {
|
||||
int64_t _node_idx;
|
||||
|
||||
Bucket() { _node_idx = -1; }
|
||||
Bucket() { _node = nullptr; }
|
||||
Node* _node;
|
||||
};
|
||||
|
||||
// Returns the next non-empty bucket and updates idx to be the index of that bucket.
|
||||
// If there are no more buckets, returns NULL and sets idx to -1
|
||||
Bucket* next_bucket(int64_t* bucket_idx);
|
||||
|
||||
// Returns node at idx. Tracking structures do not use pointers since they will
|
||||
// change as the HashTable grows.
|
||||
Node* get_node(int64_t idx) {
|
||||
DCHECK_NE(idx, -1);
|
||||
return reinterpret_cast<Node*>(_nodes + _node_byte_size * idx);
|
||||
}
|
||||
|
||||
// Resize the hash table to 'num_buckets'
|
||||
void resize_buckets(int64_t num_buckets);
|
||||
|
||||
@ -279,12 +270,11 @@ private:
|
||||
|
||||
// Chains the node at 'node_idx' to 'bucket'. Nodes in a bucket are chained
|
||||
// as a linked list; this places the new node at the beginning of the list.
|
||||
void add_to_bucket(Bucket* bucket, int64_t node_idx, Node* node);
|
||||
void add_to_bucket(Bucket* bucket, Node* node);
|
||||
|
||||
// Moves a node from one bucket to another. 'previous_node' refers to the
|
||||
// node (if any) that's chained before this node in from_bucket's linked list.
|
||||
void move_node(Bucket* from_bucket, Bucket* to_bucket, int64_t node_idx, Node* node,
|
||||
Node* previous_node);
|
||||
void move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node, Node* previous_node);
|
||||
|
||||
// Evaluate the exprs over row and cache the results in '_expr_values_buffer'.
|
||||
// Returns whether any expr evaluated to NULL
|
||||
@ -354,14 +344,16 @@ private:
|
||||
const int _node_byte_size;
|
||||
// Number of non-empty buckets. Used to determine when to grow and rehash
|
||||
int64_t _num_filled_buckets;
|
||||
// Memory to store node data. This is not allocated from a pool to take advantage
|
||||
// of realloc.
|
||||
// TODO: integrate with mem pools
|
||||
uint8_t* _nodes;
|
||||
// Buffer to store node data.
|
||||
uint8_t* _current_nodes;
|
||||
// number of nodes stored (i.e. size of hash table)
|
||||
int64_t _num_nodes;
|
||||
// max number of nodes that can be stored in '_nodes' before realloc
|
||||
int64_t _nodes_capacity;
|
||||
// current nodes buffer capacity
|
||||
int64_t _current_capacity;
|
||||
// current used
|
||||
int64_t _current_used;
|
||||
// total capacity
|
||||
int64_t _total_capacity;
|
||||
|
||||
bool _exceeded_limit; // true if any of _mem_trackers[].limit_exceeded()
|
||||
|
||||
@ -395,6 +387,8 @@ private:
|
||||
// Use bytes instead of bools to be compatible with llvm. This address must
|
||||
// not change once allocated.
|
||||
uint8_t* _expr_value_null_bits;
|
||||
// node buffer list
|
||||
std::vector<uint8_t*> _alloc_list;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -33,16 +33,14 @@ inline HashTable::Iterator HashTable::find(TupleRow* probe_row, bool probe) {
|
||||
int64_t bucket_idx = hash & (_num_buckets - 1);
|
||||
|
||||
Bucket* bucket = &_buckets[bucket_idx];
|
||||
int64_t node_idx = bucket->_node_idx;
|
||||
|
||||
while (node_idx != -1) {
|
||||
Node* node = get_node(node_idx);
|
||||
Node* node = bucket->_node;
|
||||
|
||||
while (node != nullptr) {
|
||||
if (node->_hash == hash && equals(node->data())) {
|
||||
return Iterator(this, bucket_idx, node_idx, hash);
|
||||
return Iterator(this, bucket_idx, node, hash);
|
||||
}
|
||||
|
||||
node_idx = node->_next_idx;
|
||||
node = node->_next;
|
||||
}
|
||||
|
||||
return end();
|
||||
@ -53,7 +51,7 @@ inline HashTable::Iterator HashTable::begin() {
|
||||
Bucket* bucket = next_bucket(&bucket_idx);
|
||||
|
||||
if (bucket != NULL) {
|
||||
return Iterator(this, bucket_idx, bucket->_node_idx, 0);
|
||||
return Iterator(this, bucket_idx, bucket->_node, 0);
|
||||
}
|
||||
|
||||
return end();
|
||||
@ -63,7 +61,7 @@ inline HashTable::Bucket* HashTable::next_bucket(int64_t* bucket_idx) {
|
||||
++*bucket_idx;
|
||||
|
||||
for (; *bucket_idx < _num_buckets; ++*bucket_idx) {
|
||||
if (_buckets[*bucket_idx]._node_idx != -1) {
|
||||
if (_buckets[*bucket_idx]._node != nullptr) {
|
||||
return &_buckets[*bucket_idx];
|
||||
}
|
||||
}
|
||||
@ -82,77 +80,78 @@ inline void HashTable::insert_impl(TupleRow* row) {
|
||||
uint32_t hash = hash_current_row();
|
||||
int64_t bucket_idx = hash & (_num_buckets - 1);
|
||||
|
||||
if (_num_nodes == _nodes_capacity) {
|
||||
if (_current_used == _current_capacity) {
|
||||
grow_node_array();
|
||||
}
|
||||
// get a node from memory pool
|
||||
Node* node = reinterpret_cast<Node*>(_current_nodes + _node_byte_size * _current_used++);
|
||||
|
||||
Node* node = get_node(_num_nodes);
|
||||
TupleRow* data = node->data();
|
||||
node->_hash = hash;
|
||||
memcpy(data, row, sizeof(Tuple*) * _num_build_tuples);
|
||||
add_to_bucket(&_buckets[bucket_idx], _num_nodes, node);
|
||||
add_to_bucket(&_buckets[bucket_idx], node);
|
||||
++_num_nodes;
|
||||
}
|
||||
|
||||
inline void HashTable::add_to_bucket(Bucket* bucket, int64_t node_idx, Node* node) {
|
||||
if (bucket->_node_idx == -1) {
|
||||
inline void HashTable::add_to_bucket(Bucket* bucket, Node* node) {
|
||||
if (bucket->_node == nullptr) {
|
||||
++_num_filled_buckets;
|
||||
}
|
||||
|
||||
node->_next_idx = bucket->_node_idx;
|
||||
bucket->_node_idx = node_idx;
|
||||
node->_next = bucket->_node;
|
||||
bucket->_node = node;
|
||||
}
|
||||
|
||||
inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket,
|
||||
int64_t node_idx, Node* node, Node* previous_node) {
|
||||
int64_t next_idx = node->_next_idx;
|
||||
inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node,
|
||||
Node* previous_node) {
|
||||
Node* next_node = node->_next;
|
||||
|
||||
if (previous_node != NULL) {
|
||||
previous_node->_next_idx = next_idx;
|
||||
previous_node->_next = next_node;
|
||||
} else {
|
||||
// Update bucket directly
|
||||
from_bucket->_node_idx = next_idx;
|
||||
from_bucket->_node = next_node;
|
||||
|
||||
if (next_idx == -1) {
|
||||
if (next_node == nullptr) {
|
||||
--_num_filled_buckets;
|
||||
}
|
||||
}
|
||||
|
||||
add_to_bucket(to_bucket, node_idx, node);
|
||||
add_to_bucket(to_bucket, node);
|
||||
}
|
||||
|
||||
template<bool check_match>
|
||||
template <bool check_match>
|
||||
inline void HashTable::Iterator::next() {
|
||||
if (_bucket_idx == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: this should prefetch the next tuplerow
|
||||
Node* node = _table->get_node(_node_idx);
|
||||
Node* node = _node;
|
||||
|
||||
// Iterator is not from a full table scan, evaluate equality now. Only the current
|
||||
// bucket needs to be scanned. '_expr_values_buffer' contains the results
|
||||
// for the current probe row.
|
||||
if (check_match) {
|
||||
// TODO: this should prefetch the next node
|
||||
int64_t next_idx = node->_next_idx;
|
||||
Node* next_node = node->_next;
|
||||
|
||||
while (next_idx != -1) {
|
||||
node = _table->get_node(next_idx);
|
||||
while (next_node != nullptr) {
|
||||
node = next_node;
|
||||
|
||||
if (node->_hash == _scan_hash && _table->equals(node->data())) {
|
||||
_node_idx = next_idx;
|
||||
_node = next_node;
|
||||
return;
|
||||
}
|
||||
|
||||
next_idx = node->_next_idx;
|
||||
next_node = node->_next;
|
||||
}
|
||||
|
||||
*this = _table->end();
|
||||
} else {
|
||||
// Move onto the next chained node
|
||||
if (node->_next_idx != -1) {
|
||||
_node_idx = node->_next_idx;
|
||||
if (node->_next != nullptr) {
|
||||
_node = node->_next;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -161,13 +160,13 @@ inline void HashTable::Iterator::next() {
|
||||
|
||||
if (bucket == NULL) {
|
||||
_bucket_idx = -1;
|
||||
_node_idx = -1;
|
||||
_node = nullptr;
|
||||
} else {
|
||||
_node_idx = bucket->_node_idx;
|
||||
_node = bucket->_node;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
#endif
|
||||
|
||||
@ -1,64 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#ifndef DORIS_BE_SRC_QUERY_BE_RUNTIME_DATA_STREAM_RECVR_H
|
||||
#define DORIS_BE_SRC_QUERY_BE_RUNTIME_DATA_STREAM_RECVR_H
|
||||
|
||||
#include "runtime/data_stream_mgr.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class DataStreamMgr;
|
||||
|
||||
// Single receiver of an m:n data stream.
|
||||
// Incoming row batches are routed to destinations based on the provided
|
||||
// partitioning specification.
|
||||
// Receivers are created via DataStreamMgr::CreateRecvr().
|
||||
class DataStreamRecvr {
|
||||
public:
|
||||
// deregister from _mgr
|
||||
~DataStreamRecvr() {
|
||||
// TODO: log error msg
|
||||
_mgr->deregister_recvr(_cb->fragment_instance_id(), _cb->dest_node_id());
|
||||
}
|
||||
|
||||
// Returns next row batch in data stream; blocks if there aren't any.
|
||||
// Returns NULL if eos (subsequent calls will not return any more batches).
|
||||
// Sets 'is_cancelled' to true if receiver fragment got cancelled, otherwise false.
|
||||
// The caller owns the batch.
|
||||
// TODO: error handling
|
||||
RowBatch* get_batch(bool* is_cancelled) {
|
||||
return _cb->get_batch(is_cancelled);
|
||||
}
|
||||
|
||||
RuntimeProfile* profile() {
|
||||
return _cb->profile();
|
||||
}
|
||||
|
||||
private:
|
||||
friend class DataStreamMgr;
|
||||
DataStreamMgr* _mgr;
|
||||
boost::shared_ptr<DataStreamMgr::StreamControlBlock> _cb;
|
||||
|
||||
DataStreamRecvr(DataStreamMgr* mgr,
|
||||
boost::shared_ptr<DataStreamMgr::StreamControlBlock> cb)
|
||||
: _mgr(mgr), _cb(cb) {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
@ -24,7 +24,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/exec")
|
||||
# TODO: why is this test disabled?
|
||||
#ADD_BE_TEST(new_olap_scan_node_test)
|
||||
#ADD_BE_TEST(pre_aggregation_node_test)
|
||||
#ADD_BE_TEST(hash_table_test)
|
||||
ADD_BE_TEST(hash_table_test)
|
||||
# ADD_BE_TEST(partitioned_hash_table_test)
|
||||
#ADD_BE_TEST(olap_scanner_test)
|
||||
#ADD_BE_TEST(olap_meta_reader_test)
|
||||
|
||||
@ -23,46 +23,66 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "common/compiler_util.h"
|
||||
#include "exprs/expr.h"
|
||||
#include "exprs/slot_ref.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/mem_pool.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "runtime/test_env.h"
|
||||
#include "util/cpu_info.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "util/time.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
using std::vector;
|
||||
using std::map;
|
||||
|
||||
class HashTableTest : public testing::Test {
|
||||
public:
|
||||
HashTableTest() : _mem_pool() {}
|
||||
HashTableTest() {
|
||||
_tracker = MemTracker::CreateTracker(-1, "root");
|
||||
_pool_tracker = MemTracker::CreateTracker(-1, "mem-pool", _tracker);
|
||||
_mem_pool.reset(new MemPool(_pool_tracker.get()));
|
||||
_state = _pool.add(new RuntimeState(TQueryGlobals()));
|
||||
_state->init_instance_mem_tracker();
|
||||
_state->_exec_env = ExecEnv::GetInstance();
|
||||
}
|
||||
|
||||
protected:
|
||||
RuntimeState* _state;
|
||||
std::shared_ptr<MemTracker> _tracker;
|
||||
std::shared_ptr<MemTracker> _pool_tracker;
|
||||
ObjectPool _pool;
|
||||
MemPool _mem_pool;
|
||||
std::vector<Expr*> _build_expr;
|
||||
std::vector<Expr*> _probe_expr;
|
||||
std::shared_ptr<MemPool> _mem_pool;
|
||||
std::vector<ExprContext*> _build_expr;
|
||||
std::vector<ExprContext*> _probe_expr;
|
||||
|
||||
virtual void SetUp() {
|
||||
RowDescriptor desc;
|
||||
Status status;
|
||||
TypeDescriptor int_desc(TYPE_INT);
|
||||
|
||||
// Not very easy to test complex tuple layouts so this test will use the
|
||||
// simplest. The purpose of these tests is to exercise the hash map
|
||||
// internals so a simple build/probe expr is fine.
|
||||
_build_expr.push_back(_pool.add(new SlotRef(TYPE_INT, 0)));
|
||||
status = Expr::prepare(_build_expr, NULL, desc);
|
||||
auto build_slot_ref = _pool.add(new SlotRef(int_desc, 0));
|
||||
_build_expr.push_back(_pool.add(new ExprContext(build_slot_ref)));
|
||||
status = Expr::prepare(_build_expr, _state, desc, _tracker);
|
||||
EXPECT_TRUE(status.ok());
|
||||
|
||||
_probe_expr.push_back(_pool.add(new SlotRef(TYPE_INT, 0)));
|
||||
status = Expr::prepare(_probe_expr, NULL, desc);
|
||||
auto probe_slot_ref = _pool.add(new SlotRef(int_desc, 0));
|
||||
_probe_expr.push_back(_pool.add(new ExprContext(probe_slot_ref)));
|
||||
status = Expr::prepare(_probe_expr, _state, desc, _tracker);
|
||||
EXPECT_TRUE(status.ok());
|
||||
}
|
||||
|
||||
void TearDown() {
|
||||
Expr::close(_build_expr, _state);
|
||||
Expr::close(_probe_expr, _state);
|
||||
}
|
||||
|
||||
TupleRow* create_tuple_row(int32_t val);
|
||||
|
||||
// Wrapper to call private methods on HashTable
|
||||
@ -118,7 +138,7 @@ protected:
|
||||
EXPECT_TRUE(iter == table->end());
|
||||
} else {
|
||||
if (scan) {
|
||||
map<TupleRow*, bool> matched;
|
||||
std::map<TupleRow*, bool> matched;
|
||||
|
||||
while (iter != table->end()) {
|
||||
EXPECT_TRUE(matched.find(iter.get_row()) == matched.end());
|
||||
@ -143,8 +163,8 @@ protected:
|
||||
};
|
||||
|
||||
TupleRow* HashTableTest::create_tuple_row(int32_t val) {
|
||||
uint8_t* tuple_row_mem = _mem_pool.allocate(sizeof(int32_t*));
|
||||
uint8_t* tuple_mem = _mem_pool.allocate(sizeof(int32_t));
|
||||
uint8_t* tuple_row_mem = _mem_pool->allocate(sizeof(int32_t*));
|
||||
uint8_t* tuple_mem = _mem_pool->allocate(sizeof(int32_t));
|
||||
*reinterpret_cast<int32_t*>(tuple_mem) = val;
|
||||
TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
|
||||
row->set_tuple(0, reinterpret_cast<Tuple*>(tuple_mem));
|
||||
@ -173,6 +193,9 @@ TEST_F(HashTableTest, SetupTest) {
|
||||
// testing for probe rows that are both there and not.
|
||||
// The hash table is rehashed a few times and the scans/finds are tested again.
|
||||
TEST_F(HashTableTest, BasicTest) {
|
||||
std::shared_ptr<MemTracker> hash_table_tracker =
|
||||
MemTracker::CreateTracker(-1, "hash-table-basic-tracker", _tracker);
|
||||
|
||||
TupleRow* build_rows[5];
|
||||
TupleRow* scan_rows[5] = {0};
|
||||
|
||||
@ -190,8 +213,11 @@ TEST_F(HashTableTest, BasicTest) {
|
||||
}
|
||||
}
|
||||
|
||||
// Create the hash table and insert the build rows
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, 0);
|
||||
std::vector<bool> is_null_safe = {false};
|
||||
int initial_seed = 1;
|
||||
int64_t num_buckets = 4;
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed,
|
||||
hash_table_tracker, num_buckets);
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
hash_table.insert(build_rows[i]);
|
||||
@ -226,11 +252,19 @@ TEST_F(HashTableTest, BasicTest) {
|
||||
memset(scan_rows, 0, sizeof(scan_rows));
|
||||
full_scan(&hash_table, 0, 5, true, scan_rows, build_rows);
|
||||
probe_test(&hash_table, probe_rows, 10, false);
|
||||
hash_table.close();
|
||||
}
|
||||
|
||||
// This tests makes sure we can scan ranges of buckets
|
||||
TEST_F(HashTableTest, ScanTest) {
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, 0);
|
||||
std::shared_ptr<MemTracker> hash_table_tracker =
|
||||
MemTracker::CreateTracker(-1, "hash-table-scan-tracker", _tracker);
|
||||
|
||||
std::vector<bool> is_null_safe = {false};
|
||||
int initial_seed = 1;
|
||||
int64_t num_buckets = 4;
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed,
|
||||
hash_table_tracker, num_buckets);
|
||||
// Add 1 row with val 1, 2 with val 2, etc
|
||||
std::vector<TupleRow*> build_rows;
|
||||
ProbeTestData probe_rows[15];
|
||||
@ -267,6 +301,8 @@ TEST_F(HashTableTest, ScanTest) {
|
||||
resize_table(&hash_table, 2);
|
||||
EXPECT_EQ(hash_table.num_buckets(), 2);
|
||||
probe_test(&hash_table, probe_rows, 15, true);
|
||||
|
||||
hash_table.close();
|
||||
}
|
||||
|
||||
// This test continues adding to the hash table to trigger the resize code paths
|
||||
@ -275,8 +311,13 @@ TEST_F(HashTableTest, GrowTableTest) {
|
||||
int num_to_add = 4;
|
||||
int expected_size = 0;
|
||||
|
||||
auto mem_tracker = std::make_shared<MemTracker>(1024 * 1024);
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, 0, mem_tracker, num_to_add);
|
||||
std::shared_ptr<MemTracker> mem_tracker =
|
||||
MemTracker::CreateTracker(1024 * 1024, "hash-table-grow-tracker", _tracker);
|
||||
std::vector<bool> is_null_safe = {false};
|
||||
int initial_seed = 1;
|
||||
int64_t num_buckets = 4;
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed,
|
||||
mem_tracker, num_buckets);
|
||||
EXPECT_FALSE(mem_tracker->limit_exceeded());
|
||||
|
||||
// This inserts about 5M entries
|
||||
@ -289,6 +330,7 @@ TEST_F(HashTableTest, GrowTableTest) {
|
||||
num_to_add *= 2;
|
||||
EXPECT_EQ(hash_table.size(), expected_size);
|
||||
}
|
||||
LOG(INFO) << "consume:" << mem_tracker->consumption() << ",expected_size:" << expected_size;
|
||||
|
||||
EXPECT_TRUE(mem_tracker->limit_exceeded());
|
||||
|
||||
@ -304,6 +346,7 @@ TEST_F(HashTableTest, GrowTableTest) {
|
||||
EXPECT_TRUE(iter == hash_table.end());
|
||||
}
|
||||
}
|
||||
hash_table.close();
|
||||
}
|
||||
|
||||
// This test continues adding to the hash table to trigger the resize code paths
|
||||
@ -312,37 +355,39 @@ TEST_F(HashTableTest, GrowTableTest2) {
|
||||
int num_to_add = 1024;
|
||||
int expected_size = 0;
|
||||
|
||||
auto mem_tracker = std::make_shared<MemTracker>(1024 * 1024);
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, 0, mem_tracker, num_to_add);
|
||||
std::shared_ptr<MemTracker> mem_tracker =
|
||||
MemTracker::CreateTracker(1024 * 1024, "hash-table-grow2-tracker", _tracker);
|
||||
std::vector<bool> is_null_safe = {false};
|
||||
int initial_seed = 1;
|
||||
int64_t num_buckets = 4;
|
||||
HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed,
|
||||
mem_tracker, num_buckets);
|
||||
|
||||
LOG(INFO) << time(NULL);
|
||||
|
||||
// This inserts about 5M entries
|
||||
for (int i = 0; i < 5 * 1024 * 1024; ++i) {
|
||||
hash_table.insert(create_tuple_row(build_row_val));
|
||||
// constexpr const int test_size = 5 * 1024 * 1024;
|
||||
constexpr const int test_size = 5 * 1024 * 100;
|
||||
|
||||
for (int i = 0; i < test_size; ++i) {
|
||||
hash_table.insert(create_tuple_row(build_row_val++));
|
||||
expected_size += num_to_add;
|
||||
}
|
||||
|
||||
LOG(INFO) << time(NULL);
|
||||
|
||||
// Validate that we can find the entries
|
||||
for (int i = 0; i < 5 * 1024 * 1024; ++i) {
|
||||
TupleRow* probe_row = create_tuple_row(i);
|
||||
for (int i = 0; i < test_size; ++i) {
|
||||
TupleRow* probe_row = create_tuple_row(i++);
|
||||
hash_table.find(probe_row);
|
||||
}
|
||||
|
||||
LOG(INFO) << time(NULL);
|
||||
hash_table.close();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
|
||||
if (!doris::config::init(conffile.c_str(), false)) {
|
||||
fprintf(stderr, "error read config file. \n");
|
||||
return -1;
|
||||
}
|
||||
init_glog("be-test");
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
doris::CpuInfo::init();
|
||||
return RUN_ALL_TESTS();
|
||||
|
||||
Reference in New Issue
Block a user