diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 20e00ef968..3c3fd7128a 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -83,8 +83,6 @@ set(EXEC_FILES schema_scanner/schema_charsets_scanner.cpp schema_scanner/schema_collations_scanner.cpp schema_scanner/schema_helper.cpp - partitioned_hash_table.cc - partitioned_hash_table_ir.cc new_partitioned_hash_table.cc new_partitioned_hash_table_ir.cc partitioned_aggregation_node.cc diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index d03dd3e49c..72db1be8bf 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -46,7 +46,7 @@ HashJoinNode::HashJoinNode( _is_push_down = tnode.hash_join_node.is_push_down; _build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN|| _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN - || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN + || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } HashJoinNode::~HashJoinNode() { diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc deleted file mode 100644 index 61d9d2e6e7..0000000000 --- a/be/src/exec/partitioned_hash_table.cc +++ /dev/null @@ -1,440 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned_hash_table.inline.h" - -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/slot_ref.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/mem_tracker.h" -#include "runtime/raw_value.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.hpp" -#include "util/doris_metrics.h" - -// DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table"); - -using std::string; -using std::stringstream; -using std::vector; -using std::endl; - -namespace doris { - -// Random primes to multiply the seed with. -static uint32_t SEED_PRIMES[] = { - 1, // First seed must be 1, level 0 is used by other operators in the fragment. - 1431655781, - 1183186591, - 622729787, - 472882027, - 338294347, - 275604541, - 41161739, - 29999999, - 27475109, - 611603, - 16313357, - 11380003, - 21261403, - 33393119, - 101, - 71043403 -}; - -// Put a non-zero constant in the result location for NULL. -// We don't want(NULL, 1) to hash to the same as (0, 1). -// This needs to be as big as the biggest primitive type since the bytes -// get copied directly. -// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes -static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED }; - -// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) -// to reduce the memory footprint of small queries. In particular, we always first use a -// 64KB and a 512KB block before starting using IO-sized blocks. -static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 }; -static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); - -PartitionedHashTableCtx::PartitionedHashTableCtx( - const vector& build_expr_ctxs, const vector& probe_expr_ctxs, - bool stores_nulls, bool finds_nulls, - int32_t initial_seed, int max_levels, int num_build_tuples) : - _build_expr_ctxs(build_expr_ctxs), - _probe_expr_ctxs(probe_expr_ctxs), - _stores_nulls(stores_nulls), - _finds_nulls(finds_nulls), - _level(0), - _row(reinterpret_cast(malloc(sizeof(Tuple*) * num_build_tuples))) { - // Compute the layout and buffer size to store the evaluated expr results - DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); - DCHECK(!_build_expr_ctxs.empty()); - _results_buffer_size = Expr::compute_results_layout(_build_expr_ctxs, - &_expr_values_buffer_offsets, &_var_result_begin); - _expr_values_buffer = new uint8_t[_results_buffer_size]; - memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size); - _expr_value_null_bits = new uint8_t[build_expr_ctxs.size()]; - - // Populate the seeds to use for all the levels. TODO: revisit how we generate these. - DCHECK_GE(max_levels, 0); - DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0])); - DCHECK_NE(initial_seed, 0); - _seeds.resize(max_levels + 1); - _seeds[0] = initial_seed; - for (int i = 1; i <= max_levels; ++i) { - _seeds[i] = _seeds[i - 1] * SEED_PRIMES[i]; - } -} - -void PartitionedHashTableCtx::close() { - // TODO: use tr1::array? - DCHECK(_expr_values_buffer != NULL); - delete[] _expr_values_buffer; - _expr_values_buffer = NULL; - DCHECK(_expr_value_null_bits != NULL); - delete[] _expr_value_null_bits; - _expr_value_null_bits = NULL; - free(_row); - _row = NULL; -} - -bool PartitionedHashTableCtx::eval_row(TupleRow* row, const vector& ctxs) { - bool has_null = false; - for (int i = 0; i < ctxs.size(); ++i) { - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - void* val = ctxs[i]->get_value(row); - if (val == NULL) { - // If the table doesn't store nulls, no reason to keep evaluating - if (!_stores_nulls) { - return true; - } - - _expr_value_null_bits[i] = true; - val = reinterpret_cast(&NULL_VALUE); - has_null = true; - } else { - _expr_value_null_bits[i] = false; - } - DCHECK_LE(_build_expr_ctxs[i]->root()->type().get_slot_size(), - sizeof(NULL_VALUE)); - RawValue::write(val, loc, _build_expr_ctxs[i]->root()->type(), NULL); - } - return has_null; -} - -uint32_t PartitionedHashTableCtx::hash_variable_len_row() { - uint32_t hash_val = _seeds[_level]; - // Hash the non-var length portions (if there are any) - if (_var_result_begin != 0) { - hash_val = hash_help(_expr_values_buffer, _var_result_begin, hash_val); - } - - for (int i = 0; i < _build_expr_ctxs.size(); ++i) { - // non-string and null slots are already part of expr_values_buffer - // if (_build_expr_ctxs[i]->root()->type().type != TYPE_STRING && - if (_build_expr_ctxs[i]->root()->type().type != TYPE_VARCHAR) { - continue; - } - - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - if (_expr_value_null_bits[i]) { - // Hash the null random seed values at 'loc' - hash_val = hash_help(loc, sizeof(StringValue), hash_val); - } else { - // Hash the string - // TODO: when using CRC hash on empty string, this only swaps bytes. - StringValue* str = reinterpret_cast(loc); - hash_val = hash_help(str->ptr, str->len, hash_val); - } - } - return hash_val; -} - -bool PartitionedHashTableCtx::equals(TupleRow* build_row) { - for (int i = 0; i < _build_expr_ctxs.size(); ++i) { - void* val = _build_expr_ctxs[i]->get_value(build_row); - if (val == NULL) { - if (!_stores_nulls) { - return false; - } - if (!_expr_value_null_bits[i]) { - return false; - } - continue; - } else { - if (_expr_value_null_bits[i]) { - return false; - } - } - - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - if (!RawValue::eq(loc, val, _build_expr_ctxs[i]->root()->type())) { - return false; - } - } - return true; -} - -const double PartitionedHashTable::MAX_FILL_FACTOR = 0.75f; - -PartitionedHashTable* PartitionedHashTable::create(RuntimeState* state, - BufferedBlockMgr2::Client* client, int num_build_tuples, - BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, - int64_t initial_num_buckets) { - // return new PartitionedHashTable(FLAGS_enable_quadratic_probing, state, client, - // num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); - return new PartitionedHashTable(config::enable_quadratic_probing, state, client, - num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); -} - -PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, RuntimeState* state, - BufferedBlockMgr2::Client* client, int num_build_tuples, BufferedTupleStream2* stream, - int64_t max_num_buckets, int64_t num_buckets) : - _state(state), - _block_mgr_client(client), - _tuple_stream(stream), - _stores_tuples(num_build_tuples == 1), - _quadratic_probing(quadratic_probing), - _total_data_page_size(0), - _next_node(NULL), - _node_remaining_current_page(0), - _num_duplicate_nodes(0), - _max_num_buckets(max_num_buckets), - _buckets(NULL), - _num_buckets(num_buckets), - _num_filled_buckets(0), - _num_buckets_with_duplicates(0), - _num_build_tuples(num_build_tuples), - _has_matches(false), - _num_probes(0), - _num_failed_probes(0), - _travel_length(0), - _num_hash_collisions(0), - _num_resizes(0) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; - DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0"; - DCHECK(_stores_tuples || stream != NULL); - DCHECK(client != NULL); -} - -bool PartitionedHashTable::init() { - int64_t buckets_byte_size = _num_buckets * sizeof(Bucket); - if (!_state->block_mgr2()->consume_memory(_block_mgr_client, buckets_byte_size)) { - _num_buckets = 0; - return false; - } - _buckets = reinterpret_cast(malloc(buckets_byte_size)); - memset(_buckets, 0, buckets_byte_size); - return true; -} - -void PartitionedHashTable::close() { - // Print statistics only for the large or heavily used hash tables. - // TODO: Tweak these numbers/conditions, or print them always? - const int64_t LARGE_HT = 128 * 1024; - const int64_t HEAVILY_USED = 1024 * 1024; - // TODO: These statistics should go to the runtime profile as well. - if ((_num_buckets > LARGE_HT) || (_num_probes > HEAVILY_USED)) { - VLOG(2) << print_stats(); - } - for (int i = 0; i < _data_pages.size(); ++i) { - _data_pages[i]->del(); - } -#if 0 - if (DorisMetrics::hash_table_total_bytes() != NULL) { - DorisMetrics::hash_table_total_bytes()->increment(-_total_data_page_size); - } -#endif - _data_pages.clear(); - if (_buckets != NULL) { - free(_buckets); - } - _state->block_mgr2()->release_memory(_block_mgr_client, _num_buckets * sizeof(Bucket)); -} - -int64_t PartitionedHashTable::current_mem_size() const { - return _num_buckets * sizeof(Bucket) + _num_duplicate_nodes * sizeof(DuplicateNode); -} - -bool PartitionedHashTable::check_and_resize( - uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx) { - uint64_t shift = 0; - while (_num_filled_buckets + buckets_to_fill > - (_num_buckets << shift) * MAX_FILL_FACTOR) { - // TODO: next prime instead of double? - ++shift; - } - if (shift > 0) { - return resize_buckets(_num_buckets << shift, ht_ctx); - } - return true; -} - -bool PartitionedHashTable::resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) - << "num_buckets=" << num_buckets << " must be a power of 2"; - DCHECK_GT(num_buckets, _num_filled_buckets) << "Cannot shrink the hash table to " - "smaller number of buckets than the number of filled buckets."; - VLOG(2) << "Resizing hash table from " - << _num_buckets << " to " << num_buckets << " buckets."; - if (_max_num_buckets != -1 && num_buckets > _max_num_buckets) { - return false; - } - ++_num_resizes; - - // All memory that can grow proportional to the input should come from the block mgrs - // mem tracker. - // Note that while we copying over the contents of the old hash table, we need to have - // allocated both the old and the new hash table. Once we finish, we return the memory - // of the old hash table. - int64_t old_size = _num_buckets * sizeof(Bucket); - int64_t new_size = num_buckets * sizeof(Bucket); - if (!_state->block_mgr2()->consume_memory(_block_mgr_client, new_size)) { - return false; - } - Bucket* new_buckets = reinterpret_cast(malloc(new_size)); - DCHECK(new_buckets != NULL); - memset(new_buckets, 0, new_size); - - // Walk the old table and copy all the filled buckets to the new (resized) table. - // We do not have to do anything with the duplicate nodes. This operation is expected - // to succeed. - for (PartitionedHashTable::Iterator iter = begin(ht_ctx); !iter.at_end(); - next_filled_bucket(&iter._bucket_idx, &iter._node)) { - Bucket* bucket_to_copy = &_buckets[iter._bucket_idx]; - bool found = false; - int64_t bucket_idx = probe(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found); - DCHECK(!found); - DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though " - " there are free buckets. " << num_buckets << " " << _num_filled_buckets; - Bucket* dst_bucket = &new_buckets[bucket_idx]; - *dst_bucket = *bucket_to_copy; - } - - _num_buckets = num_buckets; - free(_buckets); - _buckets = new_buckets; - _state->block_mgr2()->release_memory(_block_mgr_client, old_size); - return true; -} - -bool PartitionedHashTable::grow_node_array() { - int64_t page_size = 0; - page_size = _state->block_mgr2()->max_block_size(); - if (_data_pages.size() < NUM_SMALL_DATA_PAGES) { - page_size = std::min(page_size, INITIAL_DATA_PAGE_SIZES[_data_pages.size()]); - } - BufferedBlockMgr2::Block* block = NULL; - Status status = _state->block_mgr2()->get_new_block( - _block_mgr_client, NULL, &block, page_size); - DCHECK(status.ok() || block == NULL); - if (block == NULL) { - return false; - } - _data_pages.push_back(block); - _next_node = block->allocate(page_size); -#if 0 - if (DorisMetrics::hash_table_total_bytes() != NULL) { - DorisMetrics::hash_table_total_bytes()->increment(page_size); - } -#endif - _node_remaining_current_page = page_size / sizeof(DuplicateNode); - _total_data_page_size += page_size; - return true; -} - -void PartitionedHashTable::debug_string_tuple( - stringstream& ss, HtData& htdata, const RowDescriptor* desc) { - if (_stores_tuples) { - ss << "(" << htdata.tuple << ")"; - } else { - ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx() - << ", " << htdata.idx.offset() << ")"; - } - if (desc != NULL) { - Tuple* row[_num_build_tuples]; - ss << " " << get_row(htdata, reinterpret_cast(row))->to_string(*desc); - } -} - -string PartitionedHashTable::debug_string( - bool skip_empty, bool show_match, const RowDescriptor* desc) { - stringstream ss; - ss << endl; - for (int i = 0; i < _num_buckets; ++i) { - if (skip_empty && !_buckets[i].filled) { - continue; - } - ss << i << ": "; - if (show_match) { - if (_buckets[i].matched) { - ss << " [M]"; - } else { - ss << " [U]"; - } - } - if (_buckets[i].hasDuplicates) { - DuplicateNode* node = _buckets[i].bucketData.duplicates; - bool first = true; - ss << " [D] "; - while (node != NULL) { - if (!first) { - ss << ","; - } - debug_string_tuple(ss, node->htdata, desc); - node = node->next; - first = false; - } - } else { - ss << " [B] "; - if (_buckets[i].filled) { - debug_string_tuple(ss, _buckets[i].bucketData.htdata, desc); - } else { - ss << " - "; - } - } - ss << endl; - } - return ss.str(); -} - -string PartitionedHashTable::print_stats() const { - double curr_fill_factor = (double)_num_filled_buckets / (double)_num_buckets; - double avg_travel = (double)_travel_length / (double)_num_probes; - double avg_collisions = (double)_num_hash_collisions / (double)_num_filled_buckets; - stringstream ss; - ss << "Buckets: " << _num_buckets << " " << _num_filled_buckets << " " - << curr_fill_factor << endl; - ss << "Duplicates: " << _num_buckets_with_duplicates << " buckets " - << _num_duplicate_nodes << " nodes" << endl; - ss << "Probes: " << _num_probes << endl; - ss << "FailedProbes: " << _num_failed_probes << endl; - ss << "Travel: " << _travel_length << " " << avg_travel << endl; - ss << "HashCollisions: " << _num_hash_collisions << " " << avg_collisions << endl; - ss << "Resizes: " << _num_resizes << endl; - return ss.str(); -} - -} // namespace doris - diff --git a/be/src/exec/partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h deleted file mode 100644 index 94f6c9847f..0000000000 --- a/be/src/exec/partitioned_hash_table.h +++ /dev/null @@ -1,673 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H -#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H - -#include -#include -#include - -#include "codegen/doris_ir.h" -#include "util/logging.h" -#include "runtime/buffered_block_mgr2.h" -#include "runtime/buffered_tuple_stream2.h" -#include "runtime/buffered_tuple_stream2.inline.h" -#include "runtime/mem_tracker.h" -#include "runtime/mem_tracker.h" -#include "runtime/tuple_row.h" -#include "util/hash_util.hpp" -#include "util/bit_util.h" - -namespace doris { - -class Expr; -class ExprContext; -class MemTracker; -class MemTracker; -class RowDescriptor; -class RuntimeState; -class Tuple; -class TupleRow; -class PartitionedHashTable; - -// Linear or quadratic probing hash table implementation tailored to the usage pattern -// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and -// allows for different exprs for insertions and finds. This is the pattern we use for -// joins and aggregation where the input/build tuple row descriptor is different from the -// find/probe descriptor. The implementation is designed to allow codegen for some paths. -// -// In addition to the hash table there is also an accompanying hash table context that -// is used for insertions and probes. For example, the hash table context stores -// evaluated expr results for the current row being processed when possible into a -// contiguous memory buffer. This allows for efficient hash computation. -// -// The hash table does not support removes. The hash table is not thread safe. -// The table is optimized for the partition hash aggregation and hash joins and is not -// intended to be a generic hash table implementation. The API loosely mimics the -// std::hashset API. -// -// The data (rows) are stored in a BufferedTupleStream2. The basic data structure of this -// hash table is a vector of buckets. The buckets (indexed by the mod of the hash) -// contain a pointer to either the slot in the tuple-stream or in case of duplicate -// values, to the head of a linked list of nodes that in turn contain a pointer to -// tuple-stream slots. When inserting an entry we start at the bucket at position -// (hash % size) and search for either a bucket with the same hash or for an empty -// bucket. If a bucket with the same hash is found, we then compare for row equality and -// either insert a duplicate node if the equality is true, or continue the search if the -// row equality is false. Similarly, when probing we start from the bucket at position -// (hash % size) and search for an entry with the same hash or for an empty bucket. -// In the former case, we then check for row equality and continue the search if the row -// equality is false. In the latter case, the probe is not successful. When growing the -// hash table, the number of buckets is doubled. We trigger a resize when the fill -// factor is approx 75%. Due to the doubling nature of the buckets, we require that the -// number of buckets is a power of 2. This allows us to perform a modulo of the hash -// using a bitmask. -// -// We choose to use linear or quadratic probing because they exhibit good (predictable) -// cache behavior. -// -// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) -// to reduce the memory footprint of small queries. -// -// TODO: Compare linear and quadratic probing and remove the loser. -// TODO: We currently use 32-bit hashes. There is room in the bucket structure for at -// least 48-bits. We should exploit this space. -// TODO: Consider capping the probes with a threshold value. If an insert reaches -// that threshold it is inserted to another linked list of overflow entries. -// TODO: Smarter resizes, and perhaps avoid using powers of 2 as the hash table size. -// TODO: this is not a fancy hash table in terms of memory access patterns -// (cuckoo-hashing or something that spills to disk). We will likely want to invest -// more time into this. -// TODO: hash-join and aggregation have very different access patterns. Joins insert -// all the rows and then calls scan to find them. Aggregation interleaves find() and -// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular -// growing). -// TODO: Batched interface for inserts and finds. -// TODO: Do we need to check mem limit exceeded so often. Check once per batch? - -// Control block for a hash table. This class contains the logic as well as the variables -// needed by a thread to operate on a hash table. -class PartitionedHashTableCtx { -public: - // Create a hash table context. - // - build_exprs are the exprs that should be used to evaluate rows during insert(). - // - probe_exprs are used during find() - // - stores_nulls: if false, TupleRows with nulls are ignored during insert - // - finds_nulls: if false, find() returns End() for TupleRows with nulls - // even if stores_nulls is true - // - initial_seed: Initial seed value to use when computing hashes for rows with - // level 0. Other levels have their seeds derived from this seed. - // - The max levels we will hash with. - PartitionedHashTableCtx(const std::vector& build_expr_ctxs, - const std::vector& probe_expr_ctxs, bool stores_nulls, - bool finds_nulls, int32_t initial_seed, int max_levels, - int num_build_tuples); - - // Call to cleanup any resources. - void close(); - - void set_level(int level); - int level() const { return _level; } - uint32_t seed(int level) { return _seeds.at(level); } - - TupleRow* row() const { return _row; } - - // Returns the results of the exprs at 'expr_idx' evaluated over the last row - // processed. - // This value is invalid if the expr evaluated to NULL. - // TODO: this is an awkward abstraction but aggregation node can take advantage of - // it and save some expr evaluation calls. - void* last_expr_value(int expr_idx) const { - return _expr_values_buffer + _expr_values_buffer_offsets[expr_idx]; - } - - // Returns if the expr at 'expr_idx' evaluated to NULL for the last row. - bool last_expr_value_null(int expr_idx) const { - return _expr_value_null_bits[expr_idx]; - } - - // Evaluate and hash the build/probe row, returning in *hash. Returns false if this - // row should be rejected (doesn't need to be processed further) because it - // contains NULL. - // These need to be inlined in the IR module so we can find and replace the calls to - // EvalBuildRow()/EvalProbeRow(). - bool IR_ALWAYS_INLINE eval_and_hash_build(TupleRow* row, uint32_t* hash); - bool IR_ALWAYS_INLINE eval_and_hash_probe(TupleRow* row, uint32_t* hash); - - int results_buffer_size() const { return _results_buffer_size; } - -private: - friend class PartitionedHashTable; - friend class PartitionedHashTableTest_HashEmpty_Test; - - // Compute the hash of the values in _expr_values_buffer. - // This will be replaced by codegen. We don't want this inlined for replacing - // with codegen'd functions so the function name does not change. - uint32_t IR_NO_INLINE HashCurrentRow() { - DCHECK_LT(_level, _seeds.size()); - if (_var_result_begin == -1) { - // This handles NULLs implicitly since a constant seed value was put - // into results buffer for nulls. - // TODO: figure out which hash function to use. We need to generate uncorrelated - // hashes by changing just the seed. CRC does not have this property and FNV is - // okay. We should switch to something else. - return hash_help(_expr_values_buffer, _results_buffer_size, _seeds[_level]); - } else { - return PartitionedHashTableCtx::hash_variable_len_row(); - } - } - - // Wrapper function for calling correct HashUtil function in non-codegen'd case. - uint32_t inline hash_help(const void* input, int len, int32_t hash) { - // Use CRC hash at first level for better performance. Switch to murmur hash at - // subsequent levels since CRC doesn't randomize well with different seed inputs. - if (_level == 0) { - return HashUtil::hash(input, len, hash); - } - return HashUtil::murmur_hash2_64(input, len, hash); - } - - // Evaluate 'row' over build exprs caching the results in '_expr_values_buffer' This - // will be replaced by codegen. We do not want this function inlined when cross - // compiled because we need to be able to differentiate between EvalBuildRow and - // EvalProbeRow by name and the build/probe exprs are baked into the codegen'd - // function. - bool IR_NO_INLINE EvalBuildRow(TupleRow* row) { - return eval_row(row, _build_expr_ctxs); - } - - // Evaluate 'row' over probe exprs caching the results in '_expr_values_buffer' - // This will be replaced by codegen. - bool IR_NO_INLINE EvalProbeRow(TupleRow* row) { - return eval_row(row, _probe_expr_ctxs); - } - - // Compute the hash of the values in _expr_values_buffer for rows with variable length - // fields (e.g. strings). - uint32_t hash_variable_len_row(); - - // Evaluate the exprs over row and cache the results in '_expr_values_buffer'. - // Returns whether any expr evaluated to NULL. - // This will be replaced by codegen. - bool eval_row(TupleRow* row, const std::vector& ctxs); - - // Returns true if the values of build_exprs evaluated over 'build_row' equal - // the values cached in _expr_values_buffer. - // This will be replaced by codegen. - bool IR_NO_INLINE equals(TupleRow* build_row); - - const std::vector& _build_expr_ctxs; - const std::vector& _probe_expr_ctxs; - - // Constants on how the hash table should behave. Joins and aggs have slightly - // different behavior. - // TODO: these constants are an ideal candidate to be removed with codegen. - // TODO: ..or with template-ization - const bool _stores_nulls; - const bool _finds_nulls; - - // The current level this context is working on. Each level needs to use a - // different seed. - int _level; - - // The seeds to use for hashing. Indexed by the level. - std::vector _seeds; - - // Cache of exprs values for the current row being evaluated. This can either - // be a build row (during insert()) or probe row (during find()). - std::vector _expr_values_buffer_offsets; - - // Byte offset into '_expr_values_buffer' that begins the variable length results. - // If -1, there are no variable length slots. Never changes once set, can be removed - // with codegen. - int _var_result_begin; - - // Byte size of '_expr_values_buffer'. Never changes once set, can be removed with - // codegen. - int _results_buffer_size; - - // Buffer to store evaluated expr results. This address must not change once - // allocated since the address is baked into the codegen. - uint8_t* _expr_values_buffer; - - // Use bytes instead of bools to be compatible with llvm. This address must - // not change once allocated. - uint8_t* _expr_value_null_bits; - - // Scratch buffer to generate rows on the fly. - TupleRow* _row; - - // Cross-compiled functions to access member variables used in codegen_hash_current_row(). - uint32_t get_hash_seed() const; -}; - -// The hash table consists of a contiguous array of buckets that contain a pointer to the -// data, the hash value and three flags: whether this bucket is filled, whether this -// entry has been matched (used in right and full joins) and whether this entry has -// duplicates. If there are duplicates, then the data is pointing to the head of a -// linked list of duplicate nodes that point to the actual data. Note that the duplicate -// nodes do not contain the hash value, because all the linked nodes have the same hash -// value, the one in the bucket. The data is either a tuple stream index or a Tuple*. -// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The -// data allocated by the hash table comes from the BufferedBlockMgr2. -class PartitionedHashTable { -private: - - // Either the row in the tuple stream or a pointer to the single tuple of this row. - union HtData { - BufferedTupleStream2::RowIdx idx; - Tuple* tuple; - }; - - // Linked list of entries used for duplicates. - struct DuplicateNode { - // Used for full outer and right {outer, anti, semi} joins. Indicates whether the - // row in the DuplicateNode has been matched. - // From an abstraction point of view, this is an awkward place to store this - // information. - // TODO: Fold this flag in the next pointer below. - bool matched; - - // Chain to next duplicate node, NULL when end of list. - DuplicateNode* next; - HtData htdata; - }; - - struct Bucket { - // Whether this bucket contains a vaild entry, or it is empty. - bool filled; - - // Used for full outer and right {outer, anti, semi} joins. Indicates whether the - // row in the bucket has been matched. - // From an abstraction point of view, this is an awkward place to store this - // information but it is efficient. This space is otherwise unused. - bool matched; - - // Used in case of duplicates. If true, then the bucketData union should be used as - // 'duplicates'. - bool hasDuplicates; - - // Cache of the hash for data. - // TODO: Do we even have to cache the hash value? - uint32_t hash; - - // Either the data for this bucket or the linked list of duplicates. - union { - HtData htdata; - DuplicateNode* duplicates; - } bucketData; - }; - -public: - class Iterator; - - // Returns a newly allocated PartitionedHashTable. The probing algorithm is set by the - // FLAG_enable_quadratic_probing. - // - client: block mgr client to allocate data pages from. - // - num_build_tuples: number of Tuples in the build tuple row. - // - tuple_stream: the tuple stream which contains the tuple rows index by the - // hash table. Can be NULL if the rows contain only a single tuple, in which - // case the 'tuple_stream' is unused. - // - max_num_buckets: the maximum number of buckets that can be stored. If we - // try to grow the number of buckets to a larger number, the inserts will fail. - // -1, if it unlimited. - // - initial_num_buckets: number of buckets that the hash table should be initialized - // with. - static PartitionedHashTable* create(RuntimeState* state, BufferedBlockMgr2::Client* client, - int num_build_tuples, BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, - int64_t initial_num_buckets); - - // Allocates the initial bucket structure. Returns false if OOM. - bool init(); - - // Call to cleanup any resources. Must be called once. - void close(); - - // Inserts the row to the hash table. Returns true if the insertion was successful. - // Always returns true if the table has free buckets and the key is not a duplicate. - // The caller is responsible for ensuring that the table has free buckets - // 'idx' is the index into _tuple_stream for this row. If the row contains more than - // one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the - // hash table and the caller must guarantee it stays in memory. This will not grow the - // hash table. In the case that there is a need to insert a duplicate node, instead of - // filling a new bucket, and there is not enough memory to insert a duplicate node, - // the insert fails and this function returns false. - // Used during the build phase of hash joins. - bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, - const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash); - - // Same as insert() but for inserting a single Tuple. The 'tuple' is not copied by - // the hash table and the caller must guarantee it stays in memory. - bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash); - - // Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'. - // Returns PartitionedHashTable::End() if no match is found. The iterator can be iterated until - // PartitionedHashTable::End() to find all the matching rows. Advancing the returned iterator will - // go to the next matching row. The matching rows do not need to be evaluated since all - // the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'. - // Used during the probe phase of hash joins. - Iterator IR_ALWAYS_INLINE find(PartitionedHashTableCtx* ht_ctx, uint32_t hash); - - // If a match is found in the table, return an iterator as in find(). If a match was - // not present, return an iterator pointing to the empty bucket where the key should - // be inserted. Returns End() if the table is full. The caller can set the data in - // the bucket using a Set*() method on the iterator. - Iterator IR_ALWAYS_INLINE find_bucket(PartitionedHashTableCtx* ht_ctx, uint32_t hash, - bool* found); - - // Returns number of elements inserted in the hash table - int64_t size() const { - return _num_filled_buckets - _num_buckets_with_duplicates + _num_duplicate_nodes; - } - - // Returns the number of empty buckets. - int64_t empty_buckets() const { return _num_buckets - _num_filled_buckets; } - - // Returns the number of buckets - int64_t num_buckets() const { return _num_buckets; } - - // Returns the load factor (the number of non-empty buckets) - double load_factor() const { - return static_cast(_num_filled_buckets) / _num_buckets; - } - - // Returns an estimate of the number of bytes needed to build the hash table - // structure for 'num_rows'. To do that, it estimates the number of buckets, - // rounded up to a power of two, and also assumes that there are no duplicates. - static int64_t EstimateNumBuckets(int64_t num_rows) { - // Assume max 66% fill factor and no duplicates. - return BitUtil::next_power_of_two(3 * num_rows / 2); - } - static int64_t EstimateSize(int64_t num_rows) { - int64_t num_buckets = EstimateNumBuckets(num_rows); - return num_buckets * sizeof(Bucket); - } - - // Returns the memory occupied by the hash table, takes into account the number of - // duplicates. - int64_t current_mem_size() const; - - // Calculates the fill factor if 'buckets_to_fill' additional buckets were to be - // filled and resizes the hash table so that the projected fill factor is below the - // max fill factor. - // If it returns true, then it is guaranteed at least 'rows_to_add' rows can be - // inserted without need to resize. - bool check_and_resize(uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx); - - // Returns the number of bytes allocated to the hash table - int64_t byte_size() const { return _total_data_page_size; } - - // Returns an iterator at the beginning of the hash table. Advancing this iterator - // will traverse all elements. - Iterator begin(PartitionedHashTableCtx* ht_ctx); - - // Return an iterator pointing to the first element (Bucket or DuplicateNode, if the - // bucket has duplicates) in the hash table that does not have its matched flag set. - // Used in right joins and full-outer joins. - Iterator first_unmatched(PartitionedHashTableCtx* ctx); - - // Return true if there was a least one match. - bool HasMatches() const { return _has_matches; } - - // Return end marker. - Iterator End() { return Iterator(); } - - // Dump out the entire hash table to string. If 'skip_empty', empty buckets are - // skipped. If 'show_match', it also prints the matched flag of each node. If - // 'build_desc' is non-null, the build rows will be printed. Otherwise, only the - // the addresses of the build rows will be printed. - std::string debug_string(bool skip_empty, bool show_match, - const RowDescriptor* build_desc); - - // Print the content of a bucket or node. - void debug_string_tuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc); - - // Update and print some statistics that can be used for performance debugging. - std::string print_stats() const; - - // stl-like iterator interface. - class Iterator { - private: - // Bucket index value when probe is not successful. - static const int64_t BUCKET_NOT_FOUND = -1; - - public: - - Iterator() : _table(NULL), _row(NULL), _bucket_idx(BUCKET_NOT_FOUND), _node(NULL) { } - - // Iterates to the next element. It should be called only if !AtEnd(). - void IR_ALWAYS_INLINE next(); - - // Iterates to the next duplicate node. If the bucket does not have duplicates or - // when it reaches the last duplicate node, then it moves the Iterator to AtEnd(). - // Used when we want to iterate over all the duplicate nodes bypassing the next() - // interface (e.g. in semi/outer joins without other_join_conjuncts, in order to - // iterate over all nodes of an unmatched bucket). - void IR_ALWAYS_INLINE next_duplicate(); - - // Iterates to the next element that does not have its matched flag set. Used in - // right-outer and full-outer joins. - void next_unmatched(); - - // Return the current row or tuple. Callers must check the iterator is not AtEnd() - // before calling them. The returned row is owned by the iterator and valid until - // the next call to get_row(). It is safe to advance the iterator. - TupleRow* get_row() const; - Tuple* get_tuple() const; - - // Set the current tuple for an empty bucket. Designed to be used with the - // iterator returned from find_bucket() in the case when the value is not found. - // It is not valid to call this function if the bucket already has an entry. - void set_tuple(Tuple* tuple, uint32_t hash); - - // Sets as matched the Bucket or DuplicateNode currently pointed by the iterator, - // depending on whether the bucket has duplicates or not. The iterator cannot be - // AtEnd(). - void set_matched(); - - // Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on - // whether the bucket has duplicates or not. It should be called only if !AtEnd(). - bool is_matched() const; - - // Resets everything but the pointer to the hash table. - void set_at_end(); - - // Returns true if this iterator is at the end, i.e. get_row() cannot be called. - bool at_end() const { return _bucket_idx == BUCKET_NOT_FOUND; } - - private: - friend class PartitionedHashTable; - - Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node) - : _table(table), - _row(row), - _bucket_idx(bucket_idx), - _node(node) { - } - - PartitionedHashTable* _table; - TupleRow* _row; - - // Current bucket idx. - // TODO: Use uint32_t? - int64_t _bucket_idx; - - // Pointer to the current duplicate node. - DuplicateNode* _node; - }; - -private: - friend class Iterator; - friend class PartitionedHashTableTest; - - // Hash table constructor. Private because Create() should be used, instead - // of calling this constructor directly. - // - quadratic_probing: set to true when the probing algorithm is quadratic, as - // opposed to linear. - PartitionedHashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr2::Client* client, - int num_build_tuples, BufferedTupleStream2* tuple_stream, - int64_t max_num_buckets, int64_t initial_num_buckets); - - // Performs the probing operation according to the probing algorithm (linear or - // quadratic. Returns one of the following: - // (a) the index of the bucket that contains the entry that matches with the last row - // evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row - // equality and returns the index of the first empty bucket. - // (b) the index of the first empty bucket according to the probing algorithm (linear - // or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL. - // (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum - // distance was traveled without finding either an empty or a matching bucket. - // Using the returned index value, the caller can create an iterator that can be - // iterated until End() to find all the matching rows. - // EvalAndHashBuild() or EvalAndHashProb(e) must have been called before calling this. - // 'hash' must be the hash returned by these functions. - // 'found' indicates that a bucket that contains an equal row is found. - // - // There are wrappers of this function that perform the find and insert logic. - int64_t IR_ALWAYS_INLINE probe(Bucket* buckets, int64_t num_buckets, - PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found); - - // Performs the insert logic. Returns the HtData* of the bucket or duplicate node - // where the data should be inserted. Returns NULL if the insert was not successful. - HtData* IR_ALWAYS_INLINE insert_internal(PartitionedHashTableCtx* ht_ctx, uint32_t hash); - - // Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has - // duplicates, 'node' will be pointing to the head of the linked list of duplicates. - // Otherwise, 'node' should not be used. If there are no more buckets, sets - // 'bucket_idx' to BUCKET_NOT_FOUND. - void next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node); - - // Resize the hash table to 'num_buckets'. Returns false on OOM. - bool resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx); - - // Appends the DuplicateNode pointed by _next_node to 'bucket' and moves the _next_node - // pointer to the next DuplicateNode in the page, updating the remaining node counter. - DuplicateNode* IR_ALWAYS_INLINE append_next_node(Bucket* bucket); - - // Creates a new DuplicateNode for a entry and chains it to the bucket with index - // 'bucket_idx'. The duplicate nodes of a bucket are chained as a linked list. - // This places the new duplicate node at the beginning of the list. If this is the - // first duplicate entry inserted in this bucket, then the entry already contained by - // the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the - // bucket are copied to a DuplicateNode and 'data' is updated to pointing to a - // DuplicateNode. - // Returns NULL if the node array could not grow, i.e. there was not enough memory to - // allocate a new DuplicateNode. - DuplicateNode* IR_ALWAYS_INLINE insert_duplicate_node(int64_t bucket_idx); - - // Resets the contents of the empty bucket with index 'bucket_idx', in preparation for - // an insert. Sets all the fields of the bucket other than 'data'. - void IR_ALWAYS_INLINE prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash); - - // Return the TupleRow pointed by 'htdata'. - TupleRow* get_row(HtData& htdata, TupleRow* row) const; - - // Returns the TupleRow of the pointed 'bucket'. In case of duplicates, it - // returns the content of the first chained duplicate node of the bucket. - TupleRow* get_row(Bucket* bucket, TupleRow* row) const; - - // Grow the node array. Returns false on OOM. - bool grow_node_array(); - - // Load factor that will trigger growing the hash table on insert. This is - // defined as the number of non-empty buckets / total_buckets - static const double MAX_FILL_FACTOR; - - RuntimeState* _state; - - // Client to allocate data pages with. - BufferedBlockMgr2::Client* _block_mgr_client; - - // Stream contains the rows referenced by the hash table. Can be NULL if the - // row only contains a single tuple, in which case the TupleRow indirection - // is removed by the hash table. - BufferedTupleStream2* _tuple_stream; - - // Constants on how the hash table should behave. Joins and aggs have slightly - // different behavior. - // TODO: these constants are an ideal candidate to be removed with codegen. - // TODO: ..or with template-ization - const bool _stores_tuples; - - // Quadratic probing enabled (as opposed to linear). - const bool _quadratic_probing; - - // Data pages for all nodes. These are always pinned. - std::vector _data_pages; - - // Byte size of all buffers in _data_pages. - int64_t _total_data_page_size; - - // Next duplicate node to insert. Vaild when _node_remaining_current_page > 0. - DuplicateNode* _next_node; - - // Number of nodes left in the current page. - int _node_remaining_current_page; - - // Number of duplicate nodes. - int64_t _num_duplicate_nodes; - - const int64_t _max_num_buckets; - - // Array of all buckets. Owned by this node. Using c-style array to control - // control memory footprint. - Bucket* _buckets; - - // Total number of buckets (filled and empty). - int64_t _num_buckets; - - // Number of non-empty buckets. Used to determine when to resize. - int64_t _num_filled_buckets; - - // Number of (non-empty) buckets with duplicates. These buckets do not point to slots - // in the tuple stream, rather than to a linked list of Nodes. - int64_t _num_buckets_with_duplicates; - - // Number of build tuples, used for constructing temp row* for probes. - // TODO: We should remove it. - const int _num_build_tuples; - - // Flag used to disable spilling hash tables that already had matches in case of - // right joins (IMPALA-1488). - // TODO: Not fail when spilling hash tables with matches in right joins - bool _has_matches; - - // The stats below can be used for debugging perf. - // TODO: Should we make these statistics atomic? - // Number of find(), insert(), or find_bucket() calls that probe the hash table. - int64_t _num_probes; - - // Number of probes that failed and had to fall back to linear probing without cap. - int64_t _num_failed_probes; - - // Total distance traveled for each probe. That is the sum of the diff between the end - // position of a probe (find/insert) and its start position - // (hash & (_num_buckets - 1)). - int64_t _travel_length; - - // The number of cases where we had to compare buckets with the same hash value, but - // the row equality failed. - int64_t _num_hash_collisions; - - // How many times this table has resized so far. - int64_t _num_resizes; -}; - -} // end namespace doris - -#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H diff --git a/be/src/exec/partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h deleted file mode 100644 index 09abc3de13..0000000000 --- a/be/src/exec/partitioned_hash_table.inline.h +++ /dev/null @@ -1,379 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H -#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H - -#include "exec/partitioned_hash_table.h" - -namespace doris { - -inline bool PartitionedHashTableCtx::eval_and_hash_build(TupleRow* row, uint32_t* hash) { - bool has_null = EvalBuildRow(row); - if (!_stores_nulls && has_null) { - return false; - } - *hash = HashCurrentRow(); - return true; -} - -inline bool PartitionedHashTableCtx::eval_and_hash_probe(TupleRow* row, uint32_t* hash) { - bool has_null = EvalProbeRow(row); - if ((!_stores_nulls || !_finds_nulls) && has_null) { - return false; - } - *hash = HashCurrentRow(); - return true; -} - -inline int64_t PartitionedHashTable::probe(Bucket* buckets, int64_t num_buckets, - PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) { - DCHECK(buckets != NULL); - DCHECK_GT(num_buckets, 0); - *found = false; - int64_t bucket_idx = hash & (num_buckets - 1); - - // In case of linear probing it counts the total number of steps for statistics and - // for knowing when to exit the loop (e.g. by capping the total travel length). In case - // of quadratic probing it is also used for calculating the length of the next jump. - int64_t step = 0; - do { - Bucket* bucket = &buckets[bucket_idx]; - if (!bucket->filled) { - return bucket_idx; - } - if (hash == bucket->hash) { - if (ht_ctx != NULL && ht_ctx->equals(get_row(bucket, ht_ctx->_row))) { - *found = true; - return bucket_idx; - } - // Row equality failed, or not performed. This is a hash collision. Continue - // searching. - ++_num_hash_collisions; - } - // Move to the next bucket. - ++step; - ++_travel_length; - if (_quadratic_probing) { - // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets. - // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power - // of 2. - bucket_idx = (bucket_idx + step) & (num_buckets - 1); - } else { - bucket_idx = (bucket_idx + 1) & (num_buckets - 1); - } - } while (LIKELY(step < num_buckets)); - DCHECK_EQ(_num_filled_buckets, num_buckets) << "Probing of a non-full table " - << "failed: " << _quadratic_probing << " " << hash; - return Iterator::BUCKET_NOT_FOUND; -} - -inline PartitionedHashTable::HtData* PartitionedHashTable::insert_internal( - PartitionedHashTableCtx* ht_ctx, uint32_t hash) { - ++_num_probes; - bool found = false; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); - DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND); - if (found) { - // We need to insert a duplicate node, note that this may fail to allocate memory. - DuplicateNode* new_node = insert_duplicate_node(bucket_idx); - if (UNLIKELY(new_node == NULL)) { - return NULL; - } - return &new_node->htdata; - } else { - prepare_bucket_for_insert(bucket_idx, hash); - return &_buckets[bucket_idx].bucketData.htdata; - } -} - -inline bool PartitionedHashTable::insert(PartitionedHashTableCtx* ht_ctx, - const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash) { - if (_stores_tuples) { - return insert(ht_ctx, row->get_tuple(0), hash); - } - HtData* htdata = insert_internal(ht_ctx, hash); - // If successful insert, update the contents of the newly inserted entry with 'idx'. - if (LIKELY(htdata != NULL)) { - htdata->idx = idx; - return true; - } - return false; -} - -inline bool PartitionedHashTable::insert( - PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash) { - DCHECK(_stores_tuples); - HtData* htdata = insert_internal(ht_ctx, hash); - // If successful insert, update the contents of the newly inserted entry with 'tuple'. - if (LIKELY(htdata != NULL)) { - htdata->tuple = tuple; - return true; - } - return false; -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::find( - PartitionedHashTableCtx* ht_ctx, uint32_t hash) { - ++_num_probes; - bool found = false; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); - if (found) { - return Iterator(this, ht_ctx->row(), bucket_idx, - _buckets[bucket_idx].bucketData.duplicates); - } - return End(); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::find_bucket( - PartitionedHashTableCtx* ht_ctx, uint32_t hash, - bool* found) { - ++_num_probes; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, found); - DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ? - _buckets[bucket_idx].bucketData.duplicates : NULL; - return Iterator(this, ht_ctx->row(), bucket_idx, duplicates); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::begin(PartitionedHashTableCtx* ctx) { - int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; - DuplicateNode* node = NULL; - next_filled_bucket(&bucket_idx, &node); - return Iterator(this, ctx->row(), bucket_idx, node); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::first_unmatched( - PartitionedHashTableCtx* ctx) { - int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; - DuplicateNode* node = NULL; - next_filled_bucket(&bucket_idx, &node); - Iterator it(this, ctx->row(), bucket_idx, node); - // Check whether the bucket, or its first duplicate node, is matched. If it is not - // matched, then return. Otherwise, move to the first unmatched entry (node or bucket). - Bucket* bucket = &_buckets[bucket_idx]; - if ((!bucket->hasDuplicates && bucket->matched) || - (bucket->hasDuplicates && node->matched)) { - it.next_unmatched(); - } - return it; -} - -inline void PartitionedHashTable::next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node) { - ++*bucket_idx; - for (; *bucket_idx < _num_buckets; ++*bucket_idx) { - if (_buckets[*bucket_idx].filled) { - *node = _buckets[*bucket_idx].bucketData.duplicates; - return; - } - } - // Reached the end of the hash table. - *bucket_idx = Iterator::BUCKET_NOT_FOUND; - *node = NULL; -} - -inline void PartitionedHashTable::prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash) { - DCHECK_GE(bucket_idx, 0); - DCHECK_LT(bucket_idx, _num_buckets); - Bucket* bucket = &_buckets[bucket_idx]; - DCHECK(!bucket->filled); - ++_num_filled_buckets; - bucket->filled = true; - bucket->matched = false; - bucket->hasDuplicates = false; - bucket->hash = hash; -} - -inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::append_next_node( - Bucket* bucket) { - DCHECK_GT(_node_remaining_current_page, 0); - bucket->bucketData.duplicates = _next_node; - ++_num_duplicate_nodes; - --_node_remaining_current_page; - return _next_node++; -} - -inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::insert_duplicate_node( - int64_t bucket_idx) { - DCHECK_GE(bucket_idx, 0); - DCHECK_LT(bucket_idx, _num_buckets); - Bucket* bucket = &_buckets[bucket_idx]; - DCHECK(bucket->filled); - // Allocate one duplicate node for the new data and one for the preexisting data, - // if needed. - while (_node_remaining_current_page < 1 + !bucket->hasDuplicates) { - if (UNLIKELY(!grow_node_array())) { - return NULL; - } - } - if (!bucket->hasDuplicates) { - // This is the first duplicate in this bucket. It means that we need to convert - // the current entry in the bucket to a node and link it from the bucket. - _next_node->htdata.idx = bucket->bucketData.htdata.idx; - DCHECK(!bucket->matched); - _next_node->matched = false; - _next_node->next = NULL; - append_next_node(bucket); - bucket->hasDuplicates = true; - ++_num_buckets_with_duplicates; - } - // Link a new node. - _next_node->next = bucket->bucketData.duplicates; - _next_node->matched = false; - return append_next_node(bucket); -} - -inline TupleRow* PartitionedHashTable::get_row(HtData& htdata, TupleRow* row) const { - if (_stores_tuples) { - return reinterpret_cast(&htdata.tuple); - } else { - _tuple_stream->get_tuple_row(htdata.idx, row); - return row; - } -} - -inline TupleRow* PartitionedHashTable::get_row(Bucket* bucket, TupleRow* row) const { - DCHECK(bucket != NULL); - if (UNLIKELY(bucket->hasDuplicates)) { - DuplicateNode* duplicate = bucket->bucketData.duplicates; - DCHECK(duplicate != NULL); - return get_row(duplicate->htdata, row); - } else { - return get_row(bucket->bucketData.htdata, row); - } -} - -inline TupleRow* PartitionedHashTable::Iterator::get_row() const { - DCHECK(!at_end()); - DCHECK(_table != NULL); - DCHECK(_row != NULL); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (UNLIKELY(bucket->hasDuplicates)) { - DCHECK(_node != NULL); - return _table->get_row(_node->htdata, _row); - } else { - return _table->get_row(bucket->bucketData.htdata, _row); - } -} - -inline Tuple* PartitionedHashTable::Iterator::get_tuple() const { - DCHECK(!at_end()); - DCHECK(_table->_stores_tuples); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator. - if (UNLIKELY(bucket->hasDuplicates)) { - DCHECK(_node != NULL); - return _node->htdata.tuple; - } else { - return bucket->bucketData.htdata.tuple; - } -} - -inline void PartitionedHashTable::Iterator::set_tuple(Tuple* tuple, uint32_t hash) { - DCHECK(!at_end()); - DCHECK(_table->_stores_tuples); - _table->prepare_bucket_for_insert(_bucket_idx, hash); - _table->_buckets[_bucket_idx].bucketData.htdata.tuple = tuple; -} - -inline void PartitionedHashTable::Iterator::set_matched() { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (bucket->hasDuplicates) { - _node->matched = true; - } else { - bucket->matched = true; - } - // Used for disabling spilling of hash tables in right and full-outer joins with - // matches. See IMPALA-1488. - _table->_has_matches = true; -} - -inline bool PartitionedHashTable::Iterator::is_matched() const { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (bucket->hasDuplicates) { - return _node->matched; - } - return bucket->matched; -} - -inline void PartitionedHashTable::Iterator::set_at_end() { - _bucket_idx = BUCKET_NOT_FOUND; - _node = NULL; -} - -inline void PartitionedHashTable::Iterator::next() { - DCHECK(!at_end()); - if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { - _node = _node->next; - } else { - _table->next_filled_bucket(&_bucket_idx, &_node); - } -} - -inline void PartitionedHashTable::Iterator::next_duplicate() { - DCHECK(!at_end()); - if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { - _node = _node->next; - } else { - _bucket_idx = BUCKET_NOT_FOUND; - _node = NULL; - } -} - -inline void PartitionedHashTable::Iterator::next_unmatched() { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - // Check if there is any remaining unmatched duplicate node in the current bucket. - if (bucket->hasDuplicates) { - while (_node->next != NULL) { - _node = _node->next; - if (!_node->matched) { - return; - } - } - } - // Move to the next filled bucket and return if this bucket is not matched or - // iterate to the first not matched duplicate node. - _table->next_filled_bucket(&_bucket_idx, &_node); - while (_bucket_idx != Iterator::BUCKET_NOT_FOUND) { - bucket = &_table->_buckets[_bucket_idx]; - if (!bucket->hasDuplicates) { - if (!bucket->matched) { - return; - } - } else { - while (_node->matched && _node->next != NULL) { - _node = _node->next; - } - if (!_node->matched) { - return; - } - } - _table->next_filled_bucket(&_bucket_idx, &_node); - } -} - -inline void PartitionedHashTableCtx::set_level(int level) { - DCHECK_GE(level, 0); - DCHECK_LT(level, _seeds.size()); - _level = level; -} - -} // end namespace doris - -#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H