From 5f004cb00904b8022705a386d0337c1dc864f083 Mon Sep 17 00:00:00 2001 From: lichaoyong Date: Fri, 20 Mar 2020 14:42:40 +0800 Subject: [PATCH] Revert "[CodeStyle] Remove unused PartitionedHashTable (#3156)" (#3159) This reverts commit d3fd44f0a2fe076d2c62851babc162fcebe4d63b. --- be/src/exec/CMakeLists.txt | 2 + be/src/exec/hash_join_node.cpp | 2 +- be/src/exec/partitioned_hash_table.cc | 440 +++++++++++++ be/src/exec/partitioned_hash_table.h | 673 ++++++++++++++++++++ be/src/exec/partitioned_hash_table.inline.h | 379 +++++++++++ 5 files changed, 1495 insertions(+), 1 deletion(-) create mode 100644 be/src/exec/partitioned_hash_table.cc create mode 100644 be/src/exec/partitioned_hash_table.h create mode 100644 be/src/exec/partitioned_hash_table.inline.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 3c3fd7128a..20e00ef968 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -83,6 +83,8 @@ set(EXEC_FILES schema_scanner/schema_charsets_scanner.cpp schema_scanner/schema_collations_scanner.cpp schema_scanner/schema_helper.cpp + partitioned_hash_table.cc + partitioned_hash_table_ir.cc new_partitioned_hash_table.cc new_partitioned_hash_table_ir.cc partitioned_aggregation_node.cc diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 72db1be8bf..d03dd3e49c 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -46,7 +46,7 @@ HashJoinNode::HashJoinNode( _is_push_down = tnode.hash_join_node.is_push_down; _build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN|| _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN - || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN } HashJoinNode::~HashJoinNode() { diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc new file mode 100644 index 0000000000..61d9d2e6e7 --- /dev/null +++ b/be/src/exec/partitioned_hash_table.cc @@ -0,0 +1,440 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/partitioned_hash_table.inline.h" + +#include "exprs/expr.h" +#include "exprs/expr_context.h" +#include "exprs/slot_ref.h" +#include "runtime/buffered_block_mgr.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.hpp" +#include "util/doris_metrics.h" + +// DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table"); + +using std::string; +using std::stringstream; +using std::vector; +using std::endl; + +namespace doris { + +// Random primes to multiply the seed with. +static uint32_t SEED_PRIMES[] = { + 1, // First seed must be 1, level 0 is used by other operators in the fragment. + 1431655781, + 1183186591, + 622729787, + 472882027, + 338294347, + 275604541, + 41161739, + 29999999, + 27475109, + 611603, + 16313357, + 11380003, + 21261403, + 33393119, + 101, + 71043403 +}; + +// Put a non-zero constant in the result location for NULL. +// We don't want(NULL, 1) to hash to the same as (0, 1). +// This needs to be as big as the biggest primitive type since the bytes +// get copied directly. +// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes +static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED, + HashUtil::FNV_SEED, HashUtil::FNV_SEED }; + +// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) +// to reduce the memory footprint of small queries. In particular, we always first use a +// 64KB and a 512KB block before starting using IO-sized blocks. +static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 }; +static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); + +PartitionedHashTableCtx::PartitionedHashTableCtx( + const vector& build_expr_ctxs, const vector& probe_expr_ctxs, + bool stores_nulls, bool finds_nulls, + int32_t initial_seed, int max_levels, int num_build_tuples) : + _build_expr_ctxs(build_expr_ctxs), + _probe_expr_ctxs(probe_expr_ctxs), + _stores_nulls(stores_nulls), + _finds_nulls(finds_nulls), + _level(0), + _row(reinterpret_cast(malloc(sizeof(Tuple*) * num_build_tuples))) { + // Compute the layout and buffer size to store the evaluated expr results + DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); + DCHECK(!_build_expr_ctxs.empty()); + _results_buffer_size = Expr::compute_results_layout(_build_expr_ctxs, + &_expr_values_buffer_offsets, &_var_result_begin); + _expr_values_buffer = new uint8_t[_results_buffer_size]; + memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size); + _expr_value_null_bits = new uint8_t[build_expr_ctxs.size()]; + + // Populate the seeds to use for all the levels. TODO: revisit how we generate these. + DCHECK_GE(max_levels, 0); + DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0])); + DCHECK_NE(initial_seed, 0); + _seeds.resize(max_levels + 1); + _seeds[0] = initial_seed; + for (int i = 1; i <= max_levels; ++i) { + _seeds[i] = _seeds[i - 1] * SEED_PRIMES[i]; + } +} + +void PartitionedHashTableCtx::close() { + // TODO: use tr1::array? + DCHECK(_expr_values_buffer != NULL); + delete[] _expr_values_buffer; + _expr_values_buffer = NULL; + DCHECK(_expr_value_null_bits != NULL); + delete[] _expr_value_null_bits; + _expr_value_null_bits = NULL; + free(_row); + _row = NULL; +} + +bool PartitionedHashTableCtx::eval_row(TupleRow* row, const vector& ctxs) { + bool has_null = false; + for (int i = 0; i < ctxs.size(); ++i) { + void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; + void* val = ctxs[i]->get_value(row); + if (val == NULL) { + // If the table doesn't store nulls, no reason to keep evaluating + if (!_stores_nulls) { + return true; + } + + _expr_value_null_bits[i] = true; + val = reinterpret_cast(&NULL_VALUE); + has_null = true; + } else { + _expr_value_null_bits[i] = false; + } + DCHECK_LE(_build_expr_ctxs[i]->root()->type().get_slot_size(), + sizeof(NULL_VALUE)); + RawValue::write(val, loc, _build_expr_ctxs[i]->root()->type(), NULL); + } + return has_null; +} + +uint32_t PartitionedHashTableCtx::hash_variable_len_row() { + uint32_t hash_val = _seeds[_level]; + // Hash the non-var length portions (if there are any) + if (_var_result_begin != 0) { + hash_val = hash_help(_expr_values_buffer, _var_result_begin, hash_val); + } + + for (int i = 0; i < _build_expr_ctxs.size(); ++i) { + // non-string and null slots are already part of expr_values_buffer + // if (_build_expr_ctxs[i]->root()->type().type != TYPE_STRING && + if (_build_expr_ctxs[i]->root()->type().type != TYPE_VARCHAR) { + continue; + } + + void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; + if (_expr_value_null_bits[i]) { + // Hash the null random seed values at 'loc' + hash_val = hash_help(loc, sizeof(StringValue), hash_val); + } else { + // Hash the string + // TODO: when using CRC hash on empty string, this only swaps bytes. + StringValue* str = reinterpret_cast(loc); + hash_val = hash_help(str->ptr, str->len, hash_val); + } + } + return hash_val; +} + +bool PartitionedHashTableCtx::equals(TupleRow* build_row) { + for (int i = 0; i < _build_expr_ctxs.size(); ++i) { + void* val = _build_expr_ctxs[i]->get_value(build_row); + if (val == NULL) { + if (!_stores_nulls) { + return false; + } + if (!_expr_value_null_bits[i]) { + return false; + } + continue; + } else { + if (_expr_value_null_bits[i]) { + return false; + } + } + + void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; + if (!RawValue::eq(loc, val, _build_expr_ctxs[i]->root()->type())) { + return false; + } + } + return true; +} + +const double PartitionedHashTable::MAX_FILL_FACTOR = 0.75f; + +PartitionedHashTable* PartitionedHashTable::create(RuntimeState* state, + BufferedBlockMgr2::Client* client, int num_build_tuples, + BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, + int64_t initial_num_buckets) { + // return new PartitionedHashTable(FLAGS_enable_quadratic_probing, state, client, + // num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); + return new PartitionedHashTable(config::enable_quadratic_probing, state, client, + num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); +} + +PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, RuntimeState* state, + BufferedBlockMgr2::Client* client, int num_build_tuples, BufferedTupleStream2* stream, + int64_t max_num_buckets, int64_t num_buckets) : + _state(state), + _block_mgr_client(client), + _tuple_stream(stream), + _stores_tuples(num_build_tuples == 1), + _quadratic_probing(quadratic_probing), + _total_data_page_size(0), + _next_node(NULL), + _node_remaining_current_page(0), + _num_duplicate_nodes(0), + _max_num_buckets(max_num_buckets), + _buckets(NULL), + _num_buckets(num_buckets), + _num_filled_buckets(0), + _num_buckets_with_duplicates(0), + _num_build_tuples(num_build_tuples), + _has_matches(false), + _num_probes(0), + _num_failed_probes(0), + _travel_length(0), + _num_hash_collisions(0), + _num_resizes(0) { + DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; + DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0"; + DCHECK(_stores_tuples || stream != NULL); + DCHECK(client != NULL); +} + +bool PartitionedHashTable::init() { + int64_t buckets_byte_size = _num_buckets * sizeof(Bucket); + if (!_state->block_mgr2()->consume_memory(_block_mgr_client, buckets_byte_size)) { + _num_buckets = 0; + return false; + } + _buckets = reinterpret_cast(malloc(buckets_byte_size)); + memset(_buckets, 0, buckets_byte_size); + return true; +} + +void PartitionedHashTable::close() { + // Print statistics only for the large or heavily used hash tables. + // TODO: Tweak these numbers/conditions, or print them always? + const int64_t LARGE_HT = 128 * 1024; + const int64_t HEAVILY_USED = 1024 * 1024; + // TODO: These statistics should go to the runtime profile as well. + if ((_num_buckets > LARGE_HT) || (_num_probes > HEAVILY_USED)) { + VLOG(2) << print_stats(); + } + for (int i = 0; i < _data_pages.size(); ++i) { + _data_pages[i]->del(); + } +#if 0 + if (DorisMetrics::hash_table_total_bytes() != NULL) { + DorisMetrics::hash_table_total_bytes()->increment(-_total_data_page_size); + } +#endif + _data_pages.clear(); + if (_buckets != NULL) { + free(_buckets); + } + _state->block_mgr2()->release_memory(_block_mgr_client, _num_buckets * sizeof(Bucket)); +} + +int64_t PartitionedHashTable::current_mem_size() const { + return _num_buckets * sizeof(Bucket) + _num_duplicate_nodes * sizeof(DuplicateNode); +} + +bool PartitionedHashTable::check_and_resize( + uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx) { + uint64_t shift = 0; + while (_num_filled_buckets + buckets_to_fill > + (_num_buckets << shift) * MAX_FILL_FACTOR) { + // TODO: next prime instead of double? + ++shift; + } + if (shift > 0) { + return resize_buckets(_num_buckets << shift, ht_ctx); + } + return true; +} + +bool PartitionedHashTable::resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx) { + DCHECK_EQ((num_buckets & (num_buckets-1)), 0) + << "num_buckets=" << num_buckets << " must be a power of 2"; + DCHECK_GT(num_buckets, _num_filled_buckets) << "Cannot shrink the hash table to " + "smaller number of buckets than the number of filled buckets."; + VLOG(2) << "Resizing hash table from " + << _num_buckets << " to " << num_buckets << " buckets."; + if (_max_num_buckets != -1 && num_buckets > _max_num_buckets) { + return false; + } + ++_num_resizes; + + // All memory that can grow proportional to the input should come from the block mgrs + // mem tracker. + // Note that while we copying over the contents of the old hash table, we need to have + // allocated both the old and the new hash table. Once we finish, we return the memory + // of the old hash table. + int64_t old_size = _num_buckets * sizeof(Bucket); + int64_t new_size = num_buckets * sizeof(Bucket); + if (!_state->block_mgr2()->consume_memory(_block_mgr_client, new_size)) { + return false; + } + Bucket* new_buckets = reinterpret_cast(malloc(new_size)); + DCHECK(new_buckets != NULL); + memset(new_buckets, 0, new_size); + + // Walk the old table and copy all the filled buckets to the new (resized) table. + // We do not have to do anything with the duplicate nodes. This operation is expected + // to succeed. + for (PartitionedHashTable::Iterator iter = begin(ht_ctx); !iter.at_end(); + next_filled_bucket(&iter._bucket_idx, &iter._node)) { + Bucket* bucket_to_copy = &_buckets[iter._bucket_idx]; + bool found = false; + int64_t bucket_idx = probe(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found); + DCHECK(!found); + DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though " + " there are free buckets. " << num_buckets << " " << _num_filled_buckets; + Bucket* dst_bucket = &new_buckets[bucket_idx]; + *dst_bucket = *bucket_to_copy; + } + + _num_buckets = num_buckets; + free(_buckets); + _buckets = new_buckets; + _state->block_mgr2()->release_memory(_block_mgr_client, old_size); + return true; +} + +bool PartitionedHashTable::grow_node_array() { + int64_t page_size = 0; + page_size = _state->block_mgr2()->max_block_size(); + if (_data_pages.size() < NUM_SMALL_DATA_PAGES) { + page_size = std::min(page_size, INITIAL_DATA_PAGE_SIZES[_data_pages.size()]); + } + BufferedBlockMgr2::Block* block = NULL; + Status status = _state->block_mgr2()->get_new_block( + _block_mgr_client, NULL, &block, page_size); + DCHECK(status.ok() || block == NULL); + if (block == NULL) { + return false; + } + _data_pages.push_back(block); + _next_node = block->allocate(page_size); +#if 0 + if (DorisMetrics::hash_table_total_bytes() != NULL) { + DorisMetrics::hash_table_total_bytes()->increment(page_size); + } +#endif + _node_remaining_current_page = page_size / sizeof(DuplicateNode); + _total_data_page_size += page_size; + return true; +} + +void PartitionedHashTable::debug_string_tuple( + stringstream& ss, HtData& htdata, const RowDescriptor* desc) { + if (_stores_tuples) { + ss << "(" << htdata.tuple << ")"; + } else { + ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx() + << ", " << htdata.idx.offset() << ")"; + } + if (desc != NULL) { + Tuple* row[_num_build_tuples]; + ss << " " << get_row(htdata, reinterpret_cast(row))->to_string(*desc); + } +} + +string PartitionedHashTable::debug_string( + bool skip_empty, bool show_match, const RowDescriptor* desc) { + stringstream ss; + ss << endl; + for (int i = 0; i < _num_buckets; ++i) { + if (skip_empty && !_buckets[i].filled) { + continue; + } + ss << i << ": "; + if (show_match) { + if (_buckets[i].matched) { + ss << " [M]"; + } else { + ss << " [U]"; + } + } + if (_buckets[i].hasDuplicates) { + DuplicateNode* node = _buckets[i].bucketData.duplicates; + bool first = true; + ss << " [D] "; + while (node != NULL) { + if (!first) { + ss << ","; + } + debug_string_tuple(ss, node->htdata, desc); + node = node->next; + first = false; + } + } else { + ss << " [B] "; + if (_buckets[i].filled) { + debug_string_tuple(ss, _buckets[i].bucketData.htdata, desc); + } else { + ss << " - "; + } + } + ss << endl; + } + return ss.str(); +} + +string PartitionedHashTable::print_stats() const { + double curr_fill_factor = (double)_num_filled_buckets / (double)_num_buckets; + double avg_travel = (double)_travel_length / (double)_num_probes; + double avg_collisions = (double)_num_hash_collisions / (double)_num_filled_buckets; + stringstream ss; + ss << "Buckets: " << _num_buckets << " " << _num_filled_buckets << " " + << curr_fill_factor << endl; + ss << "Duplicates: " << _num_buckets_with_duplicates << " buckets " + << _num_duplicate_nodes << " nodes" << endl; + ss << "Probes: " << _num_probes << endl; + ss << "FailedProbes: " << _num_failed_probes << endl; + ss << "Travel: " << _travel_length << " " << avg_travel << endl; + ss << "HashCollisions: " << _num_hash_collisions << " " << avg_collisions << endl; + ss << "Resizes: " << _num_resizes << endl; + return ss.str(); +} + +} // namespace doris + diff --git a/be/src/exec/partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h new file mode 100644 index 0000000000..94f6c9847f --- /dev/null +++ b/be/src/exec/partitioned_hash_table.h @@ -0,0 +1,673 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H +#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H + +#include +#include +#include + +#include "codegen/doris_ir.h" +#include "util/logging.h" +#include "runtime/buffered_block_mgr2.h" +#include "runtime/buffered_tuple_stream2.h" +#include "runtime/buffered_tuple_stream2.inline.h" +#include "runtime/mem_tracker.h" +#include "runtime/mem_tracker.h" +#include "runtime/tuple_row.h" +#include "util/hash_util.hpp" +#include "util/bit_util.h" + +namespace doris { + +class Expr; +class ExprContext; +class MemTracker; +class MemTracker; +class RowDescriptor; +class RuntimeState; +class Tuple; +class TupleRow; +class PartitionedHashTable; + +// Linear or quadratic probing hash table implementation tailored to the usage pattern +// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and +// allows for different exprs for insertions and finds. This is the pattern we use for +// joins and aggregation where the input/build tuple row descriptor is different from the +// find/probe descriptor. The implementation is designed to allow codegen for some paths. +// +// In addition to the hash table there is also an accompanying hash table context that +// is used for insertions and probes. For example, the hash table context stores +// evaluated expr results for the current row being processed when possible into a +// contiguous memory buffer. This allows for efficient hash computation. +// +// The hash table does not support removes. The hash table is not thread safe. +// The table is optimized for the partition hash aggregation and hash joins and is not +// intended to be a generic hash table implementation. The API loosely mimics the +// std::hashset API. +// +// The data (rows) are stored in a BufferedTupleStream2. The basic data structure of this +// hash table is a vector of buckets. The buckets (indexed by the mod of the hash) +// contain a pointer to either the slot in the tuple-stream or in case of duplicate +// values, to the head of a linked list of nodes that in turn contain a pointer to +// tuple-stream slots. When inserting an entry we start at the bucket at position +// (hash % size) and search for either a bucket with the same hash or for an empty +// bucket. If a bucket with the same hash is found, we then compare for row equality and +// either insert a duplicate node if the equality is true, or continue the search if the +// row equality is false. Similarly, when probing we start from the bucket at position +// (hash % size) and search for an entry with the same hash or for an empty bucket. +// In the former case, we then check for row equality and continue the search if the row +// equality is false. In the latter case, the probe is not successful. When growing the +// hash table, the number of buckets is doubled. We trigger a resize when the fill +// factor is approx 75%. Due to the doubling nature of the buckets, we require that the +// number of buckets is a power of 2. This allows us to perform a modulo of the hash +// using a bitmask. +// +// We choose to use linear or quadratic probing because they exhibit good (predictable) +// cache behavior. +// +// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) +// to reduce the memory footprint of small queries. +// +// TODO: Compare linear and quadratic probing and remove the loser. +// TODO: We currently use 32-bit hashes. There is room in the bucket structure for at +// least 48-bits. We should exploit this space. +// TODO: Consider capping the probes with a threshold value. If an insert reaches +// that threshold it is inserted to another linked list of overflow entries. +// TODO: Smarter resizes, and perhaps avoid using powers of 2 as the hash table size. +// TODO: this is not a fancy hash table in terms of memory access patterns +// (cuckoo-hashing or something that spills to disk). We will likely want to invest +// more time into this. +// TODO: hash-join and aggregation have very different access patterns. Joins insert +// all the rows and then calls scan to find them. Aggregation interleaves find() and +// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular +// growing). +// TODO: Batched interface for inserts and finds. +// TODO: Do we need to check mem limit exceeded so often. Check once per batch? + +// Control block for a hash table. This class contains the logic as well as the variables +// needed by a thread to operate on a hash table. +class PartitionedHashTableCtx { +public: + // Create a hash table context. + // - build_exprs are the exprs that should be used to evaluate rows during insert(). + // - probe_exprs are used during find() + // - stores_nulls: if false, TupleRows with nulls are ignored during insert + // - finds_nulls: if false, find() returns End() for TupleRows with nulls + // even if stores_nulls is true + // - initial_seed: Initial seed value to use when computing hashes for rows with + // level 0. Other levels have their seeds derived from this seed. + // - The max levels we will hash with. + PartitionedHashTableCtx(const std::vector& build_expr_ctxs, + const std::vector& probe_expr_ctxs, bool stores_nulls, + bool finds_nulls, int32_t initial_seed, int max_levels, + int num_build_tuples); + + // Call to cleanup any resources. + void close(); + + void set_level(int level); + int level() const { return _level; } + uint32_t seed(int level) { return _seeds.at(level); } + + TupleRow* row() const { return _row; } + + // Returns the results of the exprs at 'expr_idx' evaluated over the last row + // processed. + // This value is invalid if the expr evaluated to NULL. + // TODO: this is an awkward abstraction but aggregation node can take advantage of + // it and save some expr evaluation calls. + void* last_expr_value(int expr_idx) const { + return _expr_values_buffer + _expr_values_buffer_offsets[expr_idx]; + } + + // Returns if the expr at 'expr_idx' evaluated to NULL for the last row. + bool last_expr_value_null(int expr_idx) const { + return _expr_value_null_bits[expr_idx]; + } + + // Evaluate and hash the build/probe row, returning in *hash. Returns false if this + // row should be rejected (doesn't need to be processed further) because it + // contains NULL. + // These need to be inlined in the IR module so we can find and replace the calls to + // EvalBuildRow()/EvalProbeRow(). + bool IR_ALWAYS_INLINE eval_and_hash_build(TupleRow* row, uint32_t* hash); + bool IR_ALWAYS_INLINE eval_and_hash_probe(TupleRow* row, uint32_t* hash); + + int results_buffer_size() const { return _results_buffer_size; } + +private: + friend class PartitionedHashTable; + friend class PartitionedHashTableTest_HashEmpty_Test; + + // Compute the hash of the values in _expr_values_buffer. + // This will be replaced by codegen. We don't want this inlined for replacing + // with codegen'd functions so the function name does not change. + uint32_t IR_NO_INLINE HashCurrentRow() { + DCHECK_LT(_level, _seeds.size()); + if (_var_result_begin == -1) { + // This handles NULLs implicitly since a constant seed value was put + // into results buffer for nulls. + // TODO: figure out which hash function to use. We need to generate uncorrelated + // hashes by changing just the seed. CRC does not have this property and FNV is + // okay. We should switch to something else. + return hash_help(_expr_values_buffer, _results_buffer_size, _seeds[_level]); + } else { + return PartitionedHashTableCtx::hash_variable_len_row(); + } + } + + // Wrapper function for calling correct HashUtil function in non-codegen'd case. + uint32_t inline hash_help(const void* input, int len, int32_t hash) { + // Use CRC hash at first level for better performance. Switch to murmur hash at + // subsequent levels since CRC doesn't randomize well with different seed inputs. + if (_level == 0) { + return HashUtil::hash(input, len, hash); + } + return HashUtil::murmur_hash2_64(input, len, hash); + } + + // Evaluate 'row' over build exprs caching the results in '_expr_values_buffer' This + // will be replaced by codegen. We do not want this function inlined when cross + // compiled because we need to be able to differentiate between EvalBuildRow and + // EvalProbeRow by name and the build/probe exprs are baked into the codegen'd + // function. + bool IR_NO_INLINE EvalBuildRow(TupleRow* row) { + return eval_row(row, _build_expr_ctxs); + } + + // Evaluate 'row' over probe exprs caching the results in '_expr_values_buffer' + // This will be replaced by codegen. + bool IR_NO_INLINE EvalProbeRow(TupleRow* row) { + return eval_row(row, _probe_expr_ctxs); + } + + // Compute the hash of the values in _expr_values_buffer for rows with variable length + // fields (e.g. strings). + uint32_t hash_variable_len_row(); + + // Evaluate the exprs over row and cache the results in '_expr_values_buffer'. + // Returns whether any expr evaluated to NULL. + // This will be replaced by codegen. + bool eval_row(TupleRow* row, const std::vector& ctxs); + + // Returns true if the values of build_exprs evaluated over 'build_row' equal + // the values cached in _expr_values_buffer. + // This will be replaced by codegen. + bool IR_NO_INLINE equals(TupleRow* build_row); + + const std::vector& _build_expr_ctxs; + const std::vector& _probe_expr_ctxs; + + // Constants on how the hash table should behave. Joins and aggs have slightly + // different behavior. + // TODO: these constants are an ideal candidate to be removed with codegen. + // TODO: ..or with template-ization + const bool _stores_nulls; + const bool _finds_nulls; + + // The current level this context is working on. Each level needs to use a + // different seed. + int _level; + + // The seeds to use for hashing. Indexed by the level. + std::vector _seeds; + + // Cache of exprs values for the current row being evaluated. This can either + // be a build row (during insert()) or probe row (during find()). + std::vector _expr_values_buffer_offsets; + + // Byte offset into '_expr_values_buffer' that begins the variable length results. + // If -1, there are no variable length slots. Never changes once set, can be removed + // with codegen. + int _var_result_begin; + + // Byte size of '_expr_values_buffer'. Never changes once set, can be removed with + // codegen. + int _results_buffer_size; + + // Buffer to store evaluated expr results. This address must not change once + // allocated since the address is baked into the codegen. + uint8_t* _expr_values_buffer; + + // Use bytes instead of bools to be compatible with llvm. This address must + // not change once allocated. + uint8_t* _expr_value_null_bits; + + // Scratch buffer to generate rows on the fly. + TupleRow* _row; + + // Cross-compiled functions to access member variables used in codegen_hash_current_row(). + uint32_t get_hash_seed() const; +}; + +// The hash table consists of a contiguous array of buckets that contain a pointer to the +// data, the hash value and three flags: whether this bucket is filled, whether this +// entry has been matched (used in right and full joins) and whether this entry has +// duplicates. If there are duplicates, then the data is pointing to the head of a +// linked list of duplicate nodes that point to the actual data. Note that the duplicate +// nodes do not contain the hash value, because all the linked nodes have the same hash +// value, the one in the bucket. The data is either a tuple stream index or a Tuple*. +// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The +// data allocated by the hash table comes from the BufferedBlockMgr2. +class PartitionedHashTable { +private: + + // Either the row in the tuple stream or a pointer to the single tuple of this row. + union HtData { + BufferedTupleStream2::RowIdx idx; + Tuple* tuple; + }; + + // Linked list of entries used for duplicates. + struct DuplicateNode { + // Used for full outer and right {outer, anti, semi} joins. Indicates whether the + // row in the DuplicateNode has been matched. + // From an abstraction point of view, this is an awkward place to store this + // information. + // TODO: Fold this flag in the next pointer below. + bool matched; + + // Chain to next duplicate node, NULL when end of list. + DuplicateNode* next; + HtData htdata; + }; + + struct Bucket { + // Whether this bucket contains a vaild entry, or it is empty. + bool filled; + + // Used for full outer and right {outer, anti, semi} joins. Indicates whether the + // row in the bucket has been matched. + // From an abstraction point of view, this is an awkward place to store this + // information but it is efficient. This space is otherwise unused. + bool matched; + + // Used in case of duplicates. If true, then the bucketData union should be used as + // 'duplicates'. + bool hasDuplicates; + + // Cache of the hash for data. + // TODO: Do we even have to cache the hash value? + uint32_t hash; + + // Either the data for this bucket or the linked list of duplicates. + union { + HtData htdata; + DuplicateNode* duplicates; + } bucketData; + }; + +public: + class Iterator; + + // Returns a newly allocated PartitionedHashTable. The probing algorithm is set by the + // FLAG_enable_quadratic_probing. + // - client: block mgr client to allocate data pages from. + // - num_build_tuples: number of Tuples in the build tuple row. + // - tuple_stream: the tuple stream which contains the tuple rows index by the + // hash table. Can be NULL if the rows contain only a single tuple, in which + // case the 'tuple_stream' is unused. + // - max_num_buckets: the maximum number of buckets that can be stored. If we + // try to grow the number of buckets to a larger number, the inserts will fail. + // -1, if it unlimited. + // - initial_num_buckets: number of buckets that the hash table should be initialized + // with. + static PartitionedHashTable* create(RuntimeState* state, BufferedBlockMgr2::Client* client, + int num_build_tuples, BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, + int64_t initial_num_buckets); + + // Allocates the initial bucket structure. Returns false if OOM. + bool init(); + + // Call to cleanup any resources. Must be called once. + void close(); + + // Inserts the row to the hash table. Returns true if the insertion was successful. + // Always returns true if the table has free buckets and the key is not a duplicate. + // The caller is responsible for ensuring that the table has free buckets + // 'idx' is the index into _tuple_stream for this row. If the row contains more than + // one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the + // hash table and the caller must guarantee it stays in memory. This will not grow the + // hash table. In the case that there is a need to insert a duplicate node, instead of + // filling a new bucket, and there is not enough memory to insert a duplicate node, + // the insert fails and this function returns false. + // Used during the build phase of hash joins. + bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, + const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash); + + // Same as insert() but for inserting a single Tuple. The 'tuple' is not copied by + // the hash table and the caller must guarantee it stays in memory. + bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash); + + // Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'. + // Returns PartitionedHashTable::End() if no match is found. The iterator can be iterated until + // PartitionedHashTable::End() to find all the matching rows. Advancing the returned iterator will + // go to the next matching row. The matching rows do not need to be evaluated since all + // the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'. + // Used during the probe phase of hash joins. + Iterator IR_ALWAYS_INLINE find(PartitionedHashTableCtx* ht_ctx, uint32_t hash); + + // If a match is found in the table, return an iterator as in find(). If a match was + // not present, return an iterator pointing to the empty bucket where the key should + // be inserted. Returns End() if the table is full. The caller can set the data in + // the bucket using a Set*() method on the iterator. + Iterator IR_ALWAYS_INLINE find_bucket(PartitionedHashTableCtx* ht_ctx, uint32_t hash, + bool* found); + + // Returns number of elements inserted in the hash table + int64_t size() const { + return _num_filled_buckets - _num_buckets_with_duplicates + _num_duplicate_nodes; + } + + // Returns the number of empty buckets. + int64_t empty_buckets() const { return _num_buckets - _num_filled_buckets; } + + // Returns the number of buckets + int64_t num_buckets() const { return _num_buckets; } + + // Returns the load factor (the number of non-empty buckets) + double load_factor() const { + return static_cast(_num_filled_buckets) / _num_buckets; + } + + // Returns an estimate of the number of bytes needed to build the hash table + // structure for 'num_rows'. To do that, it estimates the number of buckets, + // rounded up to a power of two, and also assumes that there are no duplicates. + static int64_t EstimateNumBuckets(int64_t num_rows) { + // Assume max 66% fill factor and no duplicates. + return BitUtil::next_power_of_two(3 * num_rows / 2); + } + static int64_t EstimateSize(int64_t num_rows) { + int64_t num_buckets = EstimateNumBuckets(num_rows); + return num_buckets * sizeof(Bucket); + } + + // Returns the memory occupied by the hash table, takes into account the number of + // duplicates. + int64_t current_mem_size() const; + + // Calculates the fill factor if 'buckets_to_fill' additional buckets were to be + // filled and resizes the hash table so that the projected fill factor is below the + // max fill factor. + // If it returns true, then it is guaranteed at least 'rows_to_add' rows can be + // inserted without need to resize. + bool check_and_resize(uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx); + + // Returns the number of bytes allocated to the hash table + int64_t byte_size() const { return _total_data_page_size; } + + // Returns an iterator at the beginning of the hash table. Advancing this iterator + // will traverse all elements. + Iterator begin(PartitionedHashTableCtx* ht_ctx); + + // Return an iterator pointing to the first element (Bucket or DuplicateNode, if the + // bucket has duplicates) in the hash table that does not have its matched flag set. + // Used in right joins and full-outer joins. + Iterator first_unmatched(PartitionedHashTableCtx* ctx); + + // Return true if there was a least one match. + bool HasMatches() const { return _has_matches; } + + // Return end marker. + Iterator End() { return Iterator(); } + + // Dump out the entire hash table to string. If 'skip_empty', empty buckets are + // skipped. If 'show_match', it also prints the matched flag of each node. If + // 'build_desc' is non-null, the build rows will be printed. Otherwise, only the + // the addresses of the build rows will be printed. + std::string debug_string(bool skip_empty, bool show_match, + const RowDescriptor* build_desc); + + // Print the content of a bucket or node. + void debug_string_tuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc); + + // Update and print some statistics that can be used for performance debugging. + std::string print_stats() const; + + // stl-like iterator interface. + class Iterator { + private: + // Bucket index value when probe is not successful. + static const int64_t BUCKET_NOT_FOUND = -1; + + public: + + Iterator() : _table(NULL), _row(NULL), _bucket_idx(BUCKET_NOT_FOUND), _node(NULL) { } + + // Iterates to the next element. It should be called only if !AtEnd(). + void IR_ALWAYS_INLINE next(); + + // Iterates to the next duplicate node. If the bucket does not have duplicates or + // when it reaches the last duplicate node, then it moves the Iterator to AtEnd(). + // Used when we want to iterate over all the duplicate nodes bypassing the next() + // interface (e.g. in semi/outer joins without other_join_conjuncts, in order to + // iterate over all nodes of an unmatched bucket). + void IR_ALWAYS_INLINE next_duplicate(); + + // Iterates to the next element that does not have its matched flag set. Used in + // right-outer and full-outer joins. + void next_unmatched(); + + // Return the current row or tuple. Callers must check the iterator is not AtEnd() + // before calling them. The returned row is owned by the iterator and valid until + // the next call to get_row(). It is safe to advance the iterator. + TupleRow* get_row() const; + Tuple* get_tuple() const; + + // Set the current tuple for an empty bucket. Designed to be used with the + // iterator returned from find_bucket() in the case when the value is not found. + // It is not valid to call this function if the bucket already has an entry. + void set_tuple(Tuple* tuple, uint32_t hash); + + // Sets as matched the Bucket or DuplicateNode currently pointed by the iterator, + // depending on whether the bucket has duplicates or not. The iterator cannot be + // AtEnd(). + void set_matched(); + + // Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on + // whether the bucket has duplicates or not. It should be called only if !AtEnd(). + bool is_matched() const; + + // Resets everything but the pointer to the hash table. + void set_at_end(); + + // Returns true if this iterator is at the end, i.e. get_row() cannot be called. + bool at_end() const { return _bucket_idx == BUCKET_NOT_FOUND; } + + private: + friend class PartitionedHashTable; + + Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node) + : _table(table), + _row(row), + _bucket_idx(bucket_idx), + _node(node) { + } + + PartitionedHashTable* _table; + TupleRow* _row; + + // Current bucket idx. + // TODO: Use uint32_t? + int64_t _bucket_idx; + + // Pointer to the current duplicate node. + DuplicateNode* _node; + }; + +private: + friend class Iterator; + friend class PartitionedHashTableTest; + + // Hash table constructor. Private because Create() should be used, instead + // of calling this constructor directly. + // - quadratic_probing: set to true when the probing algorithm is quadratic, as + // opposed to linear. + PartitionedHashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr2::Client* client, + int num_build_tuples, BufferedTupleStream2* tuple_stream, + int64_t max_num_buckets, int64_t initial_num_buckets); + + // Performs the probing operation according to the probing algorithm (linear or + // quadratic. Returns one of the following: + // (a) the index of the bucket that contains the entry that matches with the last row + // evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row + // equality and returns the index of the first empty bucket. + // (b) the index of the first empty bucket according to the probing algorithm (linear + // or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL. + // (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum + // distance was traveled without finding either an empty or a matching bucket. + // Using the returned index value, the caller can create an iterator that can be + // iterated until End() to find all the matching rows. + // EvalAndHashBuild() or EvalAndHashProb(e) must have been called before calling this. + // 'hash' must be the hash returned by these functions. + // 'found' indicates that a bucket that contains an equal row is found. + // + // There are wrappers of this function that perform the find and insert logic. + int64_t IR_ALWAYS_INLINE probe(Bucket* buckets, int64_t num_buckets, + PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found); + + // Performs the insert logic. Returns the HtData* of the bucket or duplicate node + // where the data should be inserted. Returns NULL if the insert was not successful. + HtData* IR_ALWAYS_INLINE insert_internal(PartitionedHashTableCtx* ht_ctx, uint32_t hash); + + // Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has + // duplicates, 'node' will be pointing to the head of the linked list of duplicates. + // Otherwise, 'node' should not be used. If there are no more buckets, sets + // 'bucket_idx' to BUCKET_NOT_FOUND. + void next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node); + + // Resize the hash table to 'num_buckets'. Returns false on OOM. + bool resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx); + + // Appends the DuplicateNode pointed by _next_node to 'bucket' and moves the _next_node + // pointer to the next DuplicateNode in the page, updating the remaining node counter. + DuplicateNode* IR_ALWAYS_INLINE append_next_node(Bucket* bucket); + + // Creates a new DuplicateNode for a entry and chains it to the bucket with index + // 'bucket_idx'. The duplicate nodes of a bucket are chained as a linked list. + // This places the new duplicate node at the beginning of the list. If this is the + // first duplicate entry inserted in this bucket, then the entry already contained by + // the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the + // bucket are copied to a DuplicateNode and 'data' is updated to pointing to a + // DuplicateNode. + // Returns NULL if the node array could not grow, i.e. there was not enough memory to + // allocate a new DuplicateNode. + DuplicateNode* IR_ALWAYS_INLINE insert_duplicate_node(int64_t bucket_idx); + + // Resets the contents of the empty bucket with index 'bucket_idx', in preparation for + // an insert. Sets all the fields of the bucket other than 'data'. + void IR_ALWAYS_INLINE prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash); + + // Return the TupleRow pointed by 'htdata'. + TupleRow* get_row(HtData& htdata, TupleRow* row) const; + + // Returns the TupleRow of the pointed 'bucket'. In case of duplicates, it + // returns the content of the first chained duplicate node of the bucket. + TupleRow* get_row(Bucket* bucket, TupleRow* row) const; + + // Grow the node array. Returns false on OOM. + bool grow_node_array(); + + // Load factor that will trigger growing the hash table on insert. This is + // defined as the number of non-empty buckets / total_buckets + static const double MAX_FILL_FACTOR; + + RuntimeState* _state; + + // Client to allocate data pages with. + BufferedBlockMgr2::Client* _block_mgr_client; + + // Stream contains the rows referenced by the hash table. Can be NULL if the + // row only contains a single tuple, in which case the TupleRow indirection + // is removed by the hash table. + BufferedTupleStream2* _tuple_stream; + + // Constants on how the hash table should behave. Joins and aggs have slightly + // different behavior. + // TODO: these constants are an ideal candidate to be removed with codegen. + // TODO: ..or with template-ization + const bool _stores_tuples; + + // Quadratic probing enabled (as opposed to linear). + const bool _quadratic_probing; + + // Data pages for all nodes. These are always pinned. + std::vector _data_pages; + + // Byte size of all buffers in _data_pages. + int64_t _total_data_page_size; + + // Next duplicate node to insert. Vaild when _node_remaining_current_page > 0. + DuplicateNode* _next_node; + + // Number of nodes left in the current page. + int _node_remaining_current_page; + + // Number of duplicate nodes. + int64_t _num_duplicate_nodes; + + const int64_t _max_num_buckets; + + // Array of all buckets. Owned by this node. Using c-style array to control + // control memory footprint. + Bucket* _buckets; + + // Total number of buckets (filled and empty). + int64_t _num_buckets; + + // Number of non-empty buckets. Used to determine when to resize. + int64_t _num_filled_buckets; + + // Number of (non-empty) buckets with duplicates. These buckets do not point to slots + // in the tuple stream, rather than to a linked list of Nodes. + int64_t _num_buckets_with_duplicates; + + // Number of build tuples, used for constructing temp row* for probes. + // TODO: We should remove it. + const int _num_build_tuples; + + // Flag used to disable spilling hash tables that already had matches in case of + // right joins (IMPALA-1488). + // TODO: Not fail when spilling hash tables with matches in right joins + bool _has_matches; + + // The stats below can be used for debugging perf. + // TODO: Should we make these statistics atomic? + // Number of find(), insert(), or find_bucket() calls that probe the hash table. + int64_t _num_probes; + + // Number of probes that failed and had to fall back to linear probing without cap. + int64_t _num_failed_probes; + + // Total distance traveled for each probe. That is the sum of the diff between the end + // position of a probe (find/insert) and its start position + // (hash & (_num_buckets - 1)). + int64_t _travel_length; + + // The number of cases where we had to compare buckets with the same hash value, but + // the row equality failed. + int64_t _num_hash_collisions; + + // How many times this table has resized so far. + int64_t _num_resizes; +}; + +} // end namespace doris + +#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H diff --git a/be/src/exec/partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h new file mode 100644 index 0000000000..09abc3de13 --- /dev/null +++ b/be/src/exec/partitioned_hash_table.inline.h @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H +#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H + +#include "exec/partitioned_hash_table.h" + +namespace doris { + +inline bool PartitionedHashTableCtx::eval_and_hash_build(TupleRow* row, uint32_t* hash) { + bool has_null = EvalBuildRow(row); + if (!_stores_nulls && has_null) { + return false; + } + *hash = HashCurrentRow(); + return true; +} + +inline bool PartitionedHashTableCtx::eval_and_hash_probe(TupleRow* row, uint32_t* hash) { + bool has_null = EvalProbeRow(row); + if ((!_stores_nulls || !_finds_nulls) && has_null) { + return false; + } + *hash = HashCurrentRow(); + return true; +} + +inline int64_t PartitionedHashTable::probe(Bucket* buckets, int64_t num_buckets, + PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) { + DCHECK(buckets != NULL); + DCHECK_GT(num_buckets, 0); + *found = false; + int64_t bucket_idx = hash & (num_buckets - 1); + + // In case of linear probing it counts the total number of steps for statistics and + // for knowing when to exit the loop (e.g. by capping the total travel length). In case + // of quadratic probing it is also used for calculating the length of the next jump. + int64_t step = 0; + do { + Bucket* bucket = &buckets[bucket_idx]; + if (!bucket->filled) { + return bucket_idx; + } + if (hash == bucket->hash) { + if (ht_ctx != NULL && ht_ctx->equals(get_row(bucket, ht_ctx->_row))) { + *found = true; + return bucket_idx; + } + // Row equality failed, or not performed. This is a hash collision. Continue + // searching. + ++_num_hash_collisions; + } + // Move to the next bucket. + ++step; + ++_travel_length; + if (_quadratic_probing) { + // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets. + // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power + // of 2. + bucket_idx = (bucket_idx + step) & (num_buckets - 1); + } else { + bucket_idx = (bucket_idx + 1) & (num_buckets - 1); + } + } while (LIKELY(step < num_buckets)); + DCHECK_EQ(_num_filled_buckets, num_buckets) << "Probing of a non-full table " + << "failed: " << _quadratic_probing << " " << hash; + return Iterator::BUCKET_NOT_FOUND; +} + +inline PartitionedHashTable::HtData* PartitionedHashTable::insert_internal( + PartitionedHashTableCtx* ht_ctx, uint32_t hash) { + ++_num_probes; + bool found = false; + int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); + DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND); + if (found) { + // We need to insert a duplicate node, note that this may fail to allocate memory. + DuplicateNode* new_node = insert_duplicate_node(bucket_idx); + if (UNLIKELY(new_node == NULL)) { + return NULL; + } + return &new_node->htdata; + } else { + prepare_bucket_for_insert(bucket_idx, hash); + return &_buckets[bucket_idx].bucketData.htdata; + } +} + +inline bool PartitionedHashTable::insert(PartitionedHashTableCtx* ht_ctx, + const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash) { + if (_stores_tuples) { + return insert(ht_ctx, row->get_tuple(0), hash); + } + HtData* htdata = insert_internal(ht_ctx, hash); + // If successful insert, update the contents of the newly inserted entry with 'idx'. + if (LIKELY(htdata != NULL)) { + htdata->idx = idx; + return true; + } + return false; +} + +inline bool PartitionedHashTable::insert( + PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash) { + DCHECK(_stores_tuples); + HtData* htdata = insert_internal(ht_ctx, hash); + // If successful insert, update the contents of the newly inserted entry with 'tuple'. + if (LIKELY(htdata != NULL)) { + htdata->tuple = tuple; + return true; + } + return false; +} + +inline PartitionedHashTable::Iterator PartitionedHashTable::find( + PartitionedHashTableCtx* ht_ctx, uint32_t hash) { + ++_num_probes; + bool found = false; + int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); + if (found) { + return Iterator(this, ht_ctx->row(), bucket_idx, + _buckets[bucket_idx].bucketData.duplicates); + } + return End(); +} + +inline PartitionedHashTable::Iterator PartitionedHashTable::find_bucket( + PartitionedHashTableCtx* ht_ctx, uint32_t hash, + bool* found) { + ++_num_probes; + int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, found); + DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ? + _buckets[bucket_idx].bucketData.duplicates : NULL; + return Iterator(this, ht_ctx->row(), bucket_idx, duplicates); +} + +inline PartitionedHashTable::Iterator PartitionedHashTable::begin(PartitionedHashTableCtx* ctx) { + int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; + DuplicateNode* node = NULL; + next_filled_bucket(&bucket_idx, &node); + return Iterator(this, ctx->row(), bucket_idx, node); +} + +inline PartitionedHashTable::Iterator PartitionedHashTable::first_unmatched( + PartitionedHashTableCtx* ctx) { + int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; + DuplicateNode* node = NULL; + next_filled_bucket(&bucket_idx, &node); + Iterator it(this, ctx->row(), bucket_idx, node); + // Check whether the bucket, or its first duplicate node, is matched. If it is not + // matched, then return. Otherwise, move to the first unmatched entry (node or bucket). + Bucket* bucket = &_buckets[bucket_idx]; + if ((!bucket->hasDuplicates && bucket->matched) || + (bucket->hasDuplicates && node->matched)) { + it.next_unmatched(); + } + return it; +} + +inline void PartitionedHashTable::next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node) { + ++*bucket_idx; + for (; *bucket_idx < _num_buckets; ++*bucket_idx) { + if (_buckets[*bucket_idx].filled) { + *node = _buckets[*bucket_idx].bucketData.duplicates; + return; + } + } + // Reached the end of the hash table. + *bucket_idx = Iterator::BUCKET_NOT_FOUND; + *node = NULL; +} + +inline void PartitionedHashTable::prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash) { + DCHECK_GE(bucket_idx, 0); + DCHECK_LT(bucket_idx, _num_buckets); + Bucket* bucket = &_buckets[bucket_idx]; + DCHECK(!bucket->filled); + ++_num_filled_buckets; + bucket->filled = true; + bucket->matched = false; + bucket->hasDuplicates = false; + bucket->hash = hash; +} + +inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::append_next_node( + Bucket* bucket) { + DCHECK_GT(_node_remaining_current_page, 0); + bucket->bucketData.duplicates = _next_node; + ++_num_duplicate_nodes; + --_node_remaining_current_page; + return _next_node++; +} + +inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::insert_duplicate_node( + int64_t bucket_idx) { + DCHECK_GE(bucket_idx, 0); + DCHECK_LT(bucket_idx, _num_buckets); + Bucket* bucket = &_buckets[bucket_idx]; + DCHECK(bucket->filled); + // Allocate one duplicate node for the new data and one for the preexisting data, + // if needed. + while (_node_remaining_current_page < 1 + !bucket->hasDuplicates) { + if (UNLIKELY(!grow_node_array())) { + return NULL; + } + } + if (!bucket->hasDuplicates) { + // This is the first duplicate in this bucket. It means that we need to convert + // the current entry in the bucket to a node and link it from the bucket. + _next_node->htdata.idx = bucket->bucketData.htdata.idx; + DCHECK(!bucket->matched); + _next_node->matched = false; + _next_node->next = NULL; + append_next_node(bucket); + bucket->hasDuplicates = true; + ++_num_buckets_with_duplicates; + } + // Link a new node. + _next_node->next = bucket->bucketData.duplicates; + _next_node->matched = false; + return append_next_node(bucket); +} + +inline TupleRow* PartitionedHashTable::get_row(HtData& htdata, TupleRow* row) const { + if (_stores_tuples) { + return reinterpret_cast(&htdata.tuple); + } else { + _tuple_stream->get_tuple_row(htdata.idx, row); + return row; + } +} + +inline TupleRow* PartitionedHashTable::get_row(Bucket* bucket, TupleRow* row) const { + DCHECK(bucket != NULL); + if (UNLIKELY(bucket->hasDuplicates)) { + DuplicateNode* duplicate = bucket->bucketData.duplicates; + DCHECK(duplicate != NULL); + return get_row(duplicate->htdata, row); + } else { + return get_row(bucket->bucketData.htdata, row); + } +} + +inline TupleRow* PartitionedHashTable::Iterator::get_row() const { + DCHECK(!at_end()); + DCHECK(_table != NULL); + DCHECK(_row != NULL); + Bucket* bucket = &_table->_buckets[_bucket_idx]; + if (UNLIKELY(bucket->hasDuplicates)) { + DCHECK(_node != NULL); + return _table->get_row(_node->htdata, _row); + } else { + return _table->get_row(bucket->bucketData.htdata, _row); + } +} + +inline Tuple* PartitionedHashTable::Iterator::get_tuple() const { + DCHECK(!at_end()); + DCHECK(_table->_stores_tuples); + Bucket* bucket = &_table->_buckets[_bucket_idx]; + // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator. + if (UNLIKELY(bucket->hasDuplicates)) { + DCHECK(_node != NULL); + return _node->htdata.tuple; + } else { + return bucket->bucketData.htdata.tuple; + } +} + +inline void PartitionedHashTable::Iterator::set_tuple(Tuple* tuple, uint32_t hash) { + DCHECK(!at_end()); + DCHECK(_table->_stores_tuples); + _table->prepare_bucket_for_insert(_bucket_idx, hash); + _table->_buckets[_bucket_idx].bucketData.htdata.tuple = tuple; +} + +inline void PartitionedHashTable::Iterator::set_matched() { + DCHECK(!at_end()); + Bucket* bucket = &_table->_buckets[_bucket_idx]; + if (bucket->hasDuplicates) { + _node->matched = true; + } else { + bucket->matched = true; + } + // Used for disabling spilling of hash tables in right and full-outer joins with + // matches. See IMPALA-1488. + _table->_has_matches = true; +} + +inline bool PartitionedHashTable::Iterator::is_matched() const { + DCHECK(!at_end()); + Bucket* bucket = &_table->_buckets[_bucket_idx]; + if (bucket->hasDuplicates) { + return _node->matched; + } + return bucket->matched; +} + +inline void PartitionedHashTable::Iterator::set_at_end() { + _bucket_idx = BUCKET_NOT_FOUND; + _node = NULL; +} + +inline void PartitionedHashTable::Iterator::next() { + DCHECK(!at_end()); + if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { + _node = _node->next; + } else { + _table->next_filled_bucket(&_bucket_idx, &_node); + } +} + +inline void PartitionedHashTable::Iterator::next_duplicate() { + DCHECK(!at_end()); + if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { + _node = _node->next; + } else { + _bucket_idx = BUCKET_NOT_FOUND; + _node = NULL; + } +} + +inline void PartitionedHashTable::Iterator::next_unmatched() { + DCHECK(!at_end()); + Bucket* bucket = &_table->_buckets[_bucket_idx]; + // Check if there is any remaining unmatched duplicate node in the current bucket. + if (bucket->hasDuplicates) { + while (_node->next != NULL) { + _node = _node->next; + if (!_node->matched) { + return; + } + } + } + // Move to the next filled bucket and return if this bucket is not matched or + // iterate to the first not matched duplicate node. + _table->next_filled_bucket(&_bucket_idx, &_node); + while (_bucket_idx != Iterator::BUCKET_NOT_FOUND) { + bucket = &_table->_buckets[_bucket_idx]; + if (!bucket->hasDuplicates) { + if (!bucket->matched) { + return; + } + } else { + while (_node->matched && _node->next != NULL) { + _node = _node->next; + } + if (!_node->matched) { + return; + } + } + _table->next_filled_bucket(&_bucket_idx, &_node); + } +} + +inline void PartitionedHashTableCtx::set_level(int level) { + DCHECK_GE(level, 0); + DCHECK_LT(level, _seeds.size()); + _level = level; +} + +} // end namespace doris + +#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H