From 5a8fcd263fcf9b287976a3337d62bd301ae156c5 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 20 Mar 2020 16:25:29 +0800 Subject: [PATCH] [CodeStyle] Delete obsolete code of partition_aggregation_node and partitioned_hash_table (#3162) --- be/src/exec/CMakeLists.txt | 4 - be/src/exec/exec_node.cpp | 5 +- be/src/exec/partitioned_aggregation_node.cc | 1093 ----------------- be/src/exec/partitioned_aggregation_node.h | 461 ------- .../exec/partitioned_aggregation_node_ir.cc | 143 --- be/src/exec/partitioned_hash_table.cc | 440 ------- be/src/exec/partitioned_hash_table.h | 673 ---------- be/src/exec/partitioned_hash_table.inline.h | 379 ------ be/src/exec/partitioned_hash_table_ir.cc | 29 - be/test/exec/partitioned_hash_table_test.cpp | 606 --------- 10 files changed, 1 insertion(+), 3832 deletions(-) delete mode 100644 be/src/exec/partitioned_aggregation_node.cc delete mode 100644 be/src/exec/partitioned_aggregation_node.h delete mode 100644 be/src/exec/partitioned_aggregation_node_ir.cc delete mode 100644 be/src/exec/partitioned_hash_table.cc delete mode 100644 be/src/exec/partitioned_hash_table.h delete mode 100644 be/src/exec/partitioned_hash_table.inline.h delete mode 100644 be/src/exec/partitioned_hash_table_ir.cc delete mode 100644 be/test/exec/partitioned_hash_table_test.cpp diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 705c439b09..223e6c5129 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -83,10 +83,6 @@ set(EXEC_FILES schema_scanner/schema_charsets_scanner.cpp schema_scanner/schema_collations_scanner.cpp schema_scanner/schema_helper.cpp - partitioned_hash_table.cc - partitioned_hash_table_ir.cc - partitioned_aggregation_node.cc - partitioned_aggregation_node_ir.cc new_partitioned_hash_table.cc new_partitioned_hash_table_ir.cc new_partitioned_aggregation_node.cc diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 62df71552e..62d2438e27 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -25,7 +25,6 @@ #include "common/status.h" #include "exprs/expr_context.h" #include "exec/aggregation_node.h" -#include "exec/partitioned_aggregation_node.h" #include "exec/new_partitioned_aggregation_node.h" #include "exec/csv_scan_node.h" #include "exec/es_scan_node.h" @@ -381,9 +380,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::AGGREGATION_NODE: - if (config::enable_partitioned_aggregation) { - *node = pool->add(new PartitionedAggregationNode(pool, tnode, descs)); - } else if (config::enable_new_partitioned_aggregation) { + if (config::enable_new_partitioned_aggregation) { *node = pool->add(new NewPartitionedAggregationNode(pool, tnode, descs)); } else { *node = pool->add(new AggregationNode(pool, tnode, descs)); diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc deleted file mode 100644 index 8d31a9074a..0000000000 --- a/be/src/exec/partitioned_aggregation_node.cc +++ /dev/null @@ -1,1093 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned_aggregation_node.h" - -#include -#include -#include - -#include "exec/partitioned_hash_table.inline.h" -#include "exprs/agg_fn_evaluator.h" -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/slot_ref.h" -#include "runtime/buffered_tuple_stream2.inline.h" -#include "runtime/descriptors.h" -#include "runtime/mem_pool.h" -#include "runtime/raw_value.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "runtime/tuple.h" -#include "runtime/tuple_row.h" -#include "udf/udf_internal.h" -#include "util/runtime_profile.h" -#include "util/stack_util.h" - -#include "gen_cpp/Exprs_types.h" -#include "gen_cpp/PlanNodes_types.h" - -using std::list; - -namespace doris { - -PartitionedAggregationNode::PartitionedAggregationNode( - ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : - ExecNode(pool, tnode, descs), - _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), - _intermediate_tuple_desc(NULL), - _output_tuple_id(tnode.agg_node.output_tuple_id), - _output_tuple_desc(NULL), - _needs_finalize(tnode.agg_node.need_finalize), - _needs_serialize(false), - _block_mgr_client(NULL), - _output_partition(NULL), - _process_row_batch_fn(NULL), - _build_timer(NULL), - _ht_resize_timer(NULL), - _get_results_timer(NULL), - _num_hash_buckets(NULL), - _partitions_created(NULL), - // _max_partition_level(NULL), - _num_row_repartitioned(NULL), - _num_repartitions(NULL), - _singleton_output_tuple(NULL), - _singleton_output_tuple_returned(true), - _partition_pool(new ObjectPool()) { - DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); -} - -Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - RETURN_IF_ERROR( - Expr::create_expr_trees(_pool, tnode.agg_node.grouping_exprs, &_probe_expr_ctxs)); - for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { - AggFnEvaluator* evaluator = NULL; - RETURN_IF_ERROR(AggFnEvaluator::create( - _pool, tnode.agg_node.aggregate_functions[i], &evaluator)); - _aggregate_evaluators.push_back(evaluator); - } - return Status::OK(); -} - -Status PartitionedAggregationNode::prepare(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - - RETURN_IF_ERROR(ExecNode::prepare(state)); - _state = state; - - _mem_pool.reset(new MemPool(mem_tracker())); - _agg_fn_pool.reset(new MemPool(expr_mem_tracker())); - - _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); - _ht_resize_timer = ADD_TIMER(runtime_profile(), "HTResizeTime"); - _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime"); - _num_hash_buckets = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT); - _partitions_created = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT); - // _max_partition_level = runtime_profile()->AddHighWaterMarkCounter( - // "MaxPartitionLevel", TUnit::UNIT); - _num_row_repartitioned = ADD_COUNTER( - runtime_profile(), "RowsRepartitioned", TUnit::UNIT); - _num_repartitions = ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT); - _num_spilled_partitions = ADD_COUNTER( - runtime_profile(), "SpilledPartitions", TUnit::UNIT); - // _largest_partition_percent = runtime_profile()->AddHighWaterMarkCounter( - // "LargestPartitionPercent", TUnit::UNIT); - - _intermediate_tuple_desc = - state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); - DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); - - RETURN_IF_ERROR( - Expr::prepare(_probe_expr_ctxs, state, child(0)->row_desc(), expr_mem_tracker())); - // AddExprCtxsToFree(_probe_expr_ctxs); - - _contains_var_len_grouping_exprs = false; - // Construct build exprs from _intermediate_agg_tuple_desc - for (int i = 0; i < _probe_expr_ctxs.size(); ++i) { - SlotDescriptor* desc = _intermediate_tuple_desc->slots()[i]; - DCHECK(desc->type().type == TYPE_NULL || - desc->type().type == _probe_expr_ctxs[i]->root()->type().type); - // Hack to avoid TYPE_NULL SlotRefs. - Expr* expr = desc->type().type != TYPE_NULL ? - new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN); - state->obj_pool()->add(expr); - _build_expr_ctxs.push_back(new ExprContext(expr)); - state->obj_pool()->add(_build_expr_ctxs.back()); - _contains_var_len_grouping_exprs |= expr->type().is_string_type(); - } - // Construct a new row desc for preparing the build exprs because neither the child's - // nor this node's output row desc may contain the intermediate tuple, e.g., - // in a single-node plan with an intermediate tuple different from the output tuple. - _intermediate_row_desc.reset(new RowDescriptor(_intermediate_tuple_desc, false)); - RETURN_IF_ERROR( - Expr::prepare(_build_expr_ctxs, state, *_intermediate_row_desc, expr_mem_tracker())); - // AddExprCtxsToFree(_build_expr_ctxs); - - int j = _probe_expr_ctxs.size(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { - // Skip non-materialized slots; we don't have evaluators instantiated for those. - while (!_intermediate_tuple_desc->slots()[j]->is_materialized()) { - DCHECK_LT(j, _intermediate_tuple_desc->slots().size() - 1) - << "#eval= " << _aggregate_evaluators.size() - << " #probe=" << _probe_expr_ctxs.size(); - ++j; - } - // SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; - SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; - FunctionContext* agg_fn_ctx = NULL; - // RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(), - // intermediate_slot_desc, output_slot_desc, _agg_fn_pool.get(), &agg_fn_ctx)); - RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(state, child(0)->row_desc(), - _agg_fn_pool.get(), output_slot_desc, output_slot_desc, - expr_mem_tracker(), &agg_fn_ctx)); - _agg_fn_ctxs.push_back(agg_fn_ctx); - state->obj_pool()->add(agg_fn_ctx); - _needs_serialize |= _aggregate_evaluators[i]->supports_serialize(); - } - - if (_probe_expr_ctxs.empty()) { - // Create single output tuple now; we need to output something - // even if our input is empty. - _singleton_output_tuple = - construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL); - // Check for failures during AggFnEvaluator::init(). - RETURN_IF_ERROR(_state->query_status()); - _singleton_output_tuple_returned = false; - } else { - _ht_ctx.reset(new PartitionedHashTableCtx(_build_expr_ctxs, _probe_expr_ctxs, true, true, - state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1)); - RETURN_IF_ERROR(_state->block_mgr2()->register_client( - min_required_buffers(), mem_tracker(), state, &_block_mgr_client)); - RETURN_IF_ERROR(create_hash_partitions(0)); - } - - // TODO: Is there a need to create the stream here? If memory reservations work we may - // be able to create this stream lazily and only whenever we need to spill. - if (_needs_serialize && _block_mgr_client != NULL) { - _serialize_stream.reset(new BufferedTupleStream2(state, *_intermediate_row_desc, - state->block_mgr2(), _block_mgr_client, false /* use_initial_small_buffers */, - false /* read_write */)); - RETURN_IF_ERROR(_serialize_stream->init(id(), runtime_profile(), false)); - DCHECK(_serialize_stream->has_write_block()); - } - - return Status::OK(); -} - -Status PartitionedAggregationNode::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - - RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state)); - RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state)); - - DCHECK_EQ(_aggregate_evaluators.size(), _agg_fn_ctxs.size()); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state, _agg_fn_ctxs[i])); - } - - // Read all the rows from the child and process them. - RETURN_IF_ERROR(_children[0]->open(state)); - RowBatch batch(_children[0]->row_desc(), state->batch_size(), mem_tracker()); - bool eos = false; - do { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while getting next from child 0.")); - RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); - - if (UNLIKELY(VLOG_ROW_IS_ON)) { - for (int i = 0; i < batch.num_rows(); ++i) { - TupleRow* row = batch.get_row(i); - VLOG_ROW << "partition-agg-node input row: " - << row->to_string(_children[0]->row_desc()); - } - } - - SCOPED_TIMER(_build_timer); - if (_process_row_batch_fn != NULL) { - RETURN_IF_ERROR(_process_row_batch_fn(this, &batch, _ht_ctx.get())); - } else if (_probe_expr_ctxs.empty()) { - RETURN_IF_ERROR(process_batch_no_grouping(&batch)); - } else { - // VLOG_ROW << "partition-agg-node batch: " << batch->to_string(); - // There is grouping, so we will do partitioned aggregation. - RETURN_IF_ERROR(process_batch(&batch, _ht_ctx.get())); - } - batch.reset(); - } while (!eos); - - // Unless we are inside a subplan expecting to call open()/get_next() on the child - // again, the child can be closed at this point. We have consumed all of the input - // from the child and transfered ownership of the resources we need. - // if (!IsInSubplan()) { - child(0)->close(state); - // } - - // Done consuming child(0)'s input. Move all the partitions in _hash_partitions - // to _spilled_partitions/_aggregated_partitions. We'll finish the processing in - // get_next(). - if (!_probe_expr_ctxs.empty()) { - RETURN_IF_ERROR(move_hash_partitions(child(0)->rows_returned())); - } - return Status::OK(); -} - -Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, before evaluating conjuncts.")); - - if (reached_limit()) { - *eos = true; - return Status::OK(); - } - - ExprContext** ctxs = &_conjunct_ctxs[0]; - int num_ctxs = _conjunct_ctxs.size(); - if (_probe_expr_ctxs.empty()) { - // There was grouping, so evaluate the conjuncts and return the single result row. - // We allow calling get_next() after eos, so don't return this row again. - if (!_singleton_output_tuple_returned) { - int row_idx = row_batch->add_row(); - TupleRow* row = row_batch->get_row(row_idx); - Tuple* output_tuple = get_output_tuple( - _agg_fn_ctxs, _singleton_output_tuple, row_batch->tuple_data_pool()); - row->set_tuple(0, output_tuple); - if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) { - row_batch->commit_last_row(); - ++_num_rows_returned; - } - _singleton_output_tuple_returned = true; - } - // Keep the current chunk to amortize the memory allocation over a series - // of reset()/open()/get_next()* calls. - row_batch->tuple_data_pool()->acquire_data(_mem_pool.get(), true); - *eos = true; - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - return Status::OK(); - } - - if (_output_iterator.at_end()) { - // Done with this partition, move onto the next one. - if (_output_partition != NULL) { - _output_partition->close(false); - _output_partition = NULL; - } - if (_aggregated_partitions.empty() && _spilled_partitions.empty()) { - // No more partitions, all done. - *eos = true; - return Status::OK(); - } - // Process next partition. - RETURN_IF_ERROR(next_partition()); - DCHECK(_output_partition != NULL); - } - - SCOPED_TIMER(_get_results_timer); - int count = 0; - const int N = BitUtil::next_power_of_two(state->batch_size()); - // Keeping returning rows from the current partition. - while (!_output_iterator.at_end() && !row_batch->at_capacity()) { - // This loop can go on for a long time if the conjuncts are very selective. Do query - // maintenance every N iterations. - if ((count++ & (N - 1)) == 0) { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("Partitioned aggregation, while evaluating conjuncts.")); - } - - int row_idx = row_batch->add_row(); - TupleRow* row = row_batch->get_row(row_idx); - Tuple* intermediate_tuple = _output_iterator.get_tuple(); - Tuple* output_tuple = get_output_tuple( - _output_partition->agg_fn_ctxs, intermediate_tuple, row_batch->tuple_data_pool()); - _output_iterator.next(); - row->set_tuple(0, output_tuple); - if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) { - row_batch->commit_last_row(); - ++_num_rows_returned; - if (reached_limit()) { - break; // TODO: remove this check? is this expensive? - } - } - } - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - *eos = reached_limit(); - if (_output_iterator.at_end()) { - row_batch->mark_need_to_return(); - } - return Status::OK(); -} - -void PartitionedAggregationNode::cleanup_hash_tbl( - const vector& agg_fn_ctxs, PartitionedHashTable::Iterator it) { - if (!_needs_finalize && !_needs_serialize) { - return; - } - - // Iterate through the remaining rows in the hash table and call serialize/finalize on - // them in order to free any memory allocated by UDAs. - if (_needs_finalize) { - // finalize() requires a dst tuple but we don't actually need the result, - // so allocate a single dummy tuple to avoid accumulating memory. - Tuple* dummy_dst = NULL; - dummy_dst = Tuple::create(_output_tuple_desc->byte_size(), _mem_pool.get()); - while (!it.at_end()) { - Tuple* tuple = it.get_tuple(); - AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dummy_dst); - it.next(); - } - } else { - while (!it.at_end()) { - Tuple* tuple = it.get_tuple(); - AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple); - it.next(); - } - } -} - -Status PartitionedAggregationNode::reset(RuntimeState* state) { - if (_probe_expr_ctxs.empty()) { - // Re-create the single output tuple for this non-grouping agg. - _singleton_output_tuple = - construct_intermediate_tuple(_agg_fn_ctxs, _mem_pool.get(), NULL, NULL); - // Check for failures during AggFnEvaluator::init(). - RETURN_IF_ERROR(_state->query_status()); - _singleton_output_tuple_returned = false; - } else { - // Reset the HT and the partitions for this grouping agg. - _ht_ctx->set_level(0); - close_partitions(); - create_hash_partitions(0); - } - // return ExecNode::reset(state); - return Status::OK(); -} - -Status PartitionedAggregationNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - - if (!_singleton_output_tuple_returned) { - DCHECK_EQ(_agg_fn_ctxs.size(), _aggregate_evaluators.size()); - get_output_tuple(_agg_fn_ctxs, _singleton_output_tuple, _mem_pool.get()); - } - - // Iterate through the remaining rows in the hash table and call serialize/finalize on - // them in order to free any memory allocated by UDAs - if (_output_partition != NULL) { - cleanup_hash_tbl(_output_partition->agg_fn_ctxs, _output_iterator); - _output_partition->close(false); - } - - close_partitions(); - - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->close(state); - } - for (int i = 0; i < _agg_fn_ctxs.size(); ++i) { - _agg_fn_ctxs[i]->impl()->close(); - } - if (_agg_fn_pool.get() != NULL) { - _agg_fn_pool->free_all(); - } - if (_mem_pool.get() != NULL) { - _mem_pool->free_all(); - } - if (_ht_ctx.get() != NULL) { - _ht_ctx->close(); - } - if (_serialize_stream.get() != NULL) { - _serialize_stream->close(); - } - - if (_block_mgr_client != NULL) { - state->block_mgr2()->clear_reservations(_block_mgr_client); - } - - Expr::close(_probe_expr_ctxs, state); - Expr::close(_build_expr_ctxs, state); - return ExecNode::close(state); -} - -Status PartitionedAggregationNode::Partition::init_streams() { - agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker())); - DCHECK_EQ(agg_fn_ctxs.size(), 0); - for (int i = 0; i < parent->_agg_fn_ctxs.size(); ++i) { - agg_fn_ctxs.push_back(parent->_agg_fn_ctxs[i]->impl()->clone(agg_fn_pool.get())); - parent->_partition_pool->add(agg_fn_ctxs[i]); - } - - aggregated_row_stream.reset(new BufferedTupleStream2(parent->_state, - *parent->_intermediate_row_desc, parent->_state->block_mgr2(), - parent->_block_mgr_client, true /* use_initial_small_buffers */, - false /* read_write */)); - RETURN_IF_ERROR(aggregated_row_stream->init(parent->id(), parent->runtime_profile(), true)); - - unaggregated_row_stream.reset(new BufferedTupleStream2(parent->_state, - parent->child(0)->row_desc(), parent->_state->block_mgr2(), - parent->_block_mgr_client, true /* use_initial_small_buffers */, - false /* read_write */)); - // This stream is only used to spill, no need to ever have this pinned. - RETURN_IF_ERROR(unaggregated_row_stream->init(parent->id(), parent->runtime_profile(), false)); - DCHECK(unaggregated_row_stream->has_write_block()); - return Status::OK(); -} - -bool PartitionedAggregationNode::Partition::init_hash_table() { - DCHECK(hash_tbl.get() == NULL); - // We use the upper PARTITION_FANOUT num bits to pick the partition so only the - // remaining bits can be used for the hash table. - // TODO: we could switch to 64 bit hashes and then we don't need a max size. - // It might be reasonable to limit individual hash table size for other reasons - // though. Always start with small buffers. - // TODO: How many buckets? We currently use a default value, 1024. - static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024; - hash_tbl.reset(PartitionedHashTable::create(parent->_state, parent->_block_mgr_client, 1, - NULL, 1 << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); - return hash_tbl->init(); -} - -Status PartitionedAggregationNode::Partition::clean_up() { - if (parent->_needs_serialize && aggregated_row_stream->num_rows() != 0) { - // We need to do a lot more work in this case. This step effectively does a merge - // aggregation in this node. We need to serialize the intermediates, spill the - // intermediates and then feed them into the aggregate function's merge step. - // This is often used when the intermediate is a string type, meaning the current - // (before serialization) in-memory layout is not the on-disk block layout. - // The disk layout does not support mutable rows. We need to rewrite the stream - // into the on disk format. - // TODO: if it happens to not be a string, we could serialize in place. This is - // a future optimization since it is very unlikely to have a serialize phase - // for those UDAs. - DCHECK(parent->_serialize_stream.get() != NULL); - DCHECK(!parent->_serialize_stream->is_pinned()); - DCHECK(parent->_serialize_stream->has_write_block()); - - const vector& evaluators = parent->_aggregate_evaluators; - - // serialize and copy the spilled partition's stream into the new stream. - Status status = Status::OK(); - bool failed_to_add = false; - BufferedTupleStream2* new_stream = parent->_serialize_stream.get(); - PartitionedHashTable::Iterator it = hash_tbl->begin(parent->_ht_ctx.get()); - while (!it.at_end()) { - Tuple* tuple = it.get_tuple(); - it.next(); - AggFnEvaluator::serialize(evaluators, agg_fn_ctxs, tuple); - if (UNLIKELY(!new_stream->add_row(reinterpret_cast(&tuple), &status))) { - failed_to_add = true; - break; - } - } - - // Even if we can't add to new_stream, finish up processing this agg stream to make - // clean up easier (someone has to finalize this stream and we don't want to remember - // where we are). - if (failed_to_add) { - parent->cleanup_hash_tbl(agg_fn_ctxs, it); - hash_tbl->close(); - hash_tbl.reset(); - aggregated_row_stream->close(); - RETURN_IF_ERROR(status); - return parent->_state->block_mgr2()->mem_limit_too_low_error(parent->_block_mgr_client, - parent->id()); - } - DCHECK(status.ok()); - - aggregated_row_stream->close(); - aggregated_row_stream.swap(parent->_serialize_stream); - // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for - // when we need to spill again. We need to have this available before we need - // to spill to make sure it is available. This should be acquirable since we just - // freed at least one buffer from this partition's (old) aggregated_row_stream. - parent->_serialize_stream.reset(new BufferedTupleStream2(parent->_state, - *parent->_intermediate_row_desc, parent->_state->block_mgr2(), - parent->_block_mgr_client, false /* use_initial_small_buffers */, - false /* read_write */)); - status = parent->_serialize_stream->init(parent->id(), parent->runtime_profile(), false); - if (!status.ok()) { - hash_tbl->close(); - hash_tbl.reset(); - return status; - } - DCHECK(parent->_serialize_stream->has_write_block()); - } - return Status::OK(); -} - -Status PartitionedAggregationNode::Partition::spill() { - DCHECK(!is_closed); - DCHECK(!is_spilled()); - - RETURN_IF_ERROR(clean_up()); - - // Free the in-memory result data. - for (int i = 0; i < agg_fn_ctxs.size(); ++i) { - agg_fn_ctxs[i]->impl()->close(); - } - - if (agg_fn_pool.get() != NULL) { - agg_fn_pool->free_all(); - agg_fn_pool.reset(); - } - - hash_tbl->close(); - hash_tbl.reset(); - - // Try to switch both streams to IO-sized buffers to avoid allocating small buffers - // for spilled partition. - bool got_buffer = true; - if (aggregated_row_stream->using_small_buffers()) { - RETURN_IF_ERROR(aggregated_row_stream->switch_to_io_buffers(&got_buffer)); - } - // Unpin the stream as soon as possible to increase the changes that the - // switch_to_io_buffers() call below will succeed. - DCHECK(!got_buffer || aggregated_row_stream->has_write_block()) - << aggregated_row_stream->debug_string(); - RETURN_IF_ERROR(aggregated_row_stream->unpin_stream(false)); - - if (got_buffer && unaggregated_row_stream->using_small_buffers()) { - RETURN_IF_ERROR(unaggregated_row_stream->switch_to_io_buffers(&got_buffer)); - } - if (!got_buffer) { - // We'll try again to get the buffers when the stream fills up the small buffers. - VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition " - << this << " of agg=" << parent->_id << " agg small buffers=" - << aggregated_row_stream->using_small_buffers() - << " unagg small buffers=" - << unaggregated_row_stream->using_small_buffers(); - VLOG_FILE << get_stack_trace(); - } - - COUNTER_UPDATE(parent->_num_spilled_partitions, 1); - if (parent->_num_spilled_partitions->value() == 1) { - parent->add_runtime_exec_option("Spilled"); - } - return Status::OK(); -} - -void PartitionedAggregationNode::Partition::close(bool finalize_rows) { - if (is_closed) { - return; - } - is_closed = true; - if (aggregated_row_stream.get() != NULL) { - if (finalize_rows && hash_tbl.get() != NULL) { - // We need to walk all the rows and finalize them here so the UDA gets a chance - // to cleanup. If the hash table is gone (meaning this was spilled), the rows - // should have been finalized/serialized in spill(). - parent->cleanup_hash_tbl(agg_fn_ctxs, hash_tbl->begin(parent->_ht_ctx.get())); - } - aggregated_row_stream->close(); - } - if (hash_tbl.get() != NULL) { - hash_tbl->close(); - } - if (unaggregated_row_stream.get() != NULL) { - unaggregated_row_stream->close(); - } - - for (int i = 0; i < agg_fn_ctxs.size(); ++i) { - agg_fn_ctxs[i]->impl()->close(); - } - if (agg_fn_pool.get() != NULL) { - agg_fn_pool->free_all(); - } -} - -Tuple* PartitionedAggregationNode::construct_intermediate_tuple( - const vector& agg_fn_ctxs, MemPool* pool, - BufferedTupleStream2* stream, Status* status) { - Tuple* intermediate_tuple = NULL; - uint8_t* buffer = NULL; - if (pool != NULL) { - DCHECK(stream == NULL && status == NULL); - intermediate_tuple = Tuple::create(_intermediate_tuple_desc->byte_size(), pool); - } else { - DCHECK(stream != NULL && status != NULL); - // Figure out how big it will be to copy the entire tuple. We need the tuple to end - // up in one block in the stream. - int size = _intermediate_tuple_desc->byte_size(); - if (_contains_var_len_grouping_exprs) { - // TODO: This is likely to be too slow. The hash table could maintain this as - // it hashes. - for (int i = 0; i < _probe_expr_ctxs.size(); ++i) { - if (!_probe_expr_ctxs[i]->root()->type().is_string_type()) { - continue; - } - if (_ht_ctx->last_expr_value_null(i)) { - continue; - } - StringValue* sv = reinterpret_cast(_ht_ctx->last_expr_value(i)); - size += sv->len; - } - } - - // Now that we know the size of the row, allocate space for it in the stream. - buffer = stream->allocate_row(size, status); - if (buffer == NULL) { - if (!status->ok() || !stream->using_small_buffers()) { - return NULL; - } - // IMPALA-2352: Make a best effort to switch to IO buffers and re-allocate. - // If switch_to_io_buffers() fails the caller of this function can try to free - // some space, e.g. through spilling, and re-attempt to allocate space for - // this row. - bool got_buffer = false; - *status = stream->switch_to_io_buffers(&got_buffer); - if (!status->ok() || !got_buffer) { - return NULL; - } - buffer = stream->allocate_row(size, status); - if (buffer == NULL) { - return NULL; - } - } - intermediate_tuple = reinterpret_cast(buffer); - // TODO: remove this. we shouldn't need to zero the entire tuple. - intermediate_tuple->init(size); - buffer += _intermediate_tuple_desc->byte_size(); - } - - // Copy grouping values. - vector::const_iterator slot_desc = _intermediate_tuple_desc->slots().begin(); - for (int i = 0; i < _probe_expr_ctxs.size(); ++i, ++slot_desc) { - if (_ht_ctx->last_expr_value_null(i)) { - intermediate_tuple->set_null((*slot_desc)->null_indicator_offset()); - } else { - void* src = _ht_ctx->last_expr_value(i); - void* dst = intermediate_tuple->get_slot((*slot_desc)->tuple_offset()); - if (stream == NULL) { - RawValue::write(src, dst, (*slot_desc)->type(), pool); - } else { - RawValue::write(src, (*slot_desc)->type(), dst, &buffer); - } - } - } - - // Initialize aggregate output. - for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++slot_desc) { - while (!(*slot_desc)->is_materialized()) { - ++slot_desc; - } - AggFnEvaluator* evaluator = _aggregate_evaluators[i]; - evaluator->init(agg_fn_ctxs[i], intermediate_tuple); - // Codegen specific path for min/max. - // To minimize branching on the update_tuple path, initialize the result value - // so that update_tuple doesn't have to check if the aggregation - // dst slot is null. - // TODO: remove when we don't use the irbuilder for codegen here. This optimization - // will no longer be necessary when all aggregates are implemented with the UDA - // interface. - // if ((*slot_desc)->type().type != TYPE_STRING && - // (*slot_desc)->type().type != TYPE_VARCHAR && - // (*slot_desc)->type().type != TYPE_TIMESTAMP && - // (*slot_desc)->type().type != TYPE_CHAR && - // (*slot_desc)->type().type != TYPE_DECIMAL) { - if (!(*slot_desc)->type().is_string_type() - && !(*slot_desc)->type().is_date_type()) { - ExprValue default_value; - void* default_value_ptr = NULL; - switch (evaluator->agg_op()) { - case AggFnEvaluator::MIN: - default_value_ptr = default_value.set_to_max((*slot_desc)->type()); - RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL); - break; - case AggFnEvaluator::MAX: - default_value_ptr = default_value.set_to_min((*slot_desc)->type()); - RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL); - break; - default: - break; - } - } - } - return intermediate_tuple; -} - -void PartitionedAggregationNode::update_tuple(FunctionContext** agg_fn_ctxs, - Tuple* tuple, TupleRow* row, bool is_merge) { - DCHECK(tuple != NULL || _aggregate_evaluators.empty()); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - if (is_merge) { - _aggregate_evaluators[i]->merge(agg_fn_ctxs[i], row->get_tuple(0), tuple); - } else { - _aggregate_evaluators[i]->add(agg_fn_ctxs[i], row, tuple); - } - } -} - -Tuple* PartitionedAggregationNode::get_output_tuple( - const vector& agg_fn_ctxs, Tuple* tuple, MemPool* pool) { - DCHECK(tuple != NULL || _aggregate_evaluators.empty()) << tuple; - Tuple* dst = tuple; - // if (_needs_finalize && _intermediate_tuple_id != _output_tuple_id) { - if (_needs_finalize) { - dst = Tuple::create(_output_tuple_desc->byte_size(), pool); - } - if (_needs_finalize) { - AggFnEvaluator::finalize(_aggregate_evaluators, agg_fn_ctxs, tuple, dst); - } else { - AggFnEvaluator::serialize(_aggregate_evaluators, agg_fn_ctxs, tuple); - } - // Copy grouping values from tuple to dst. - // TODO: Codegen this. - if (dst != tuple) { - int num_grouping_slots = _probe_expr_ctxs.size(); - for (int i = 0; i < num_grouping_slots; ++i) { - SlotDescriptor* src_slot_desc = _intermediate_tuple_desc->slots()[i]; - SlotDescriptor* dst_slot_desc = _output_tuple_desc->slots()[i]; - bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset()); - void* src_slot = NULL; - if (!src_slot_null) { - src_slot = tuple->get_slot(src_slot_desc->tuple_offset()); - } - RawValue::write(src_slot, dst, dst_slot_desc, NULL); - } - } - return dst; -} - -Status PartitionedAggregationNode::append_spilled_row(BufferedTupleStream2* stream, TupleRow* row) { - DCHECK(stream != NULL); - DCHECK(!stream->is_pinned()); - DCHECK(stream->has_write_block()); - if (LIKELY(stream->add_row(row, &_process_batch_status))) { - return Status::OK(); - } - - // Adding fails iff either we hit an error or haven't switched to I/O buffers. - RETURN_IF_ERROR(_process_batch_status); - while (true) { - bool got_buffer = false; - RETURN_IF_ERROR(stream->switch_to_io_buffers(&got_buffer)); - if (got_buffer) { - break; - } - RETURN_IF_ERROR(spill_partition()); - } - - // Adding the row should succeed after the I/O buffer switch. - if (stream->add_row(row, &_process_batch_status)) { - return Status::OK(); - } - DCHECK(!_process_batch_status.ok()); - return _process_batch_status; -} - -void PartitionedAggregationNode::debug_string(int indentation_level, stringstream* out) const { - *out << string(indentation_level * 2, ' '); - *out << "PartitionedAggregationNode(" - << "intermediate_tuple_id=" << _intermediate_tuple_id - << " output_tuple_id=" << _output_tuple_id - << " needs_finalize=" << _needs_finalize - << " probe_exprs=" << Expr::debug_string(_probe_expr_ctxs) - << " agg_exprs=" << AggFnEvaluator::debug_string(_aggregate_evaluators); - ExecNode::debug_string(indentation_level, out); - *out << ")"; -} - -Status PartitionedAggregationNode::create_hash_partitions(int level) { - if (level >= MAX_PARTITION_DEPTH) { - stringstream error_msg; - error_msg << "Cannot perform aggregation at hash aggregation node with id " - << _id << '.' - << " The input data was partitioned the maximum number of " - << MAX_PARTITION_DEPTH << " times." - << " This could mean there is significant skew in the data or the memory limit is" - << " set too low."; - return _state->set_mem_limit_exceeded(error_msg.str()); - } - _ht_ctx->set_level(level); - - DCHECK(_hash_partitions.empty()); - for (int i = 0; i < PARTITION_FANOUT; ++i) { - Partition* new_partition = new Partition(this, level); - DCHECK(new_partition != NULL); - _hash_partitions.push_back(_partition_pool->add(new_partition)); - RETURN_IF_ERROR(new_partition->init_streams()); - } - DCHECK_GT(_state->block_mgr2()->num_reserved_buffers_remaining(_block_mgr_client), 0); - - // Now that all the streams are reserved (meaning we have enough memory to execute - // the algorithm), allocate the hash tables. These can fail and we can still continue. - for (int i = 0; i < PARTITION_FANOUT; ++i) { - if (!_hash_partitions[i]->init_hash_table()) { - RETURN_IF_ERROR(_hash_partitions[i]->spill()); - } - } - COUNTER_UPDATE(_partitions_created, PARTITION_FANOUT); - // COUNTER_SET(_max_partition_level, level); - return Status::OK(); -} - -Status PartitionedAggregationNode::check_and_resize_hash_partitions(int num_rows, - PartitionedHashTableCtx* ht_ctx) { - for (int i = 0; i < PARTITION_FANOUT; ++i) { - Partition* partition = _hash_partitions[i]; - while (!partition->is_spilled()) { - { - SCOPED_TIMER(_ht_resize_timer); - if (partition->hash_tbl->check_and_resize(num_rows, ht_ctx)) { - break; - } - } - // There was not enough memory for the resize. Spill a partition and retry. - RETURN_IF_ERROR(spill_partition()); - } - } - return Status::OK(); -} - -int64_t PartitionedAggregationNode::largest_spilled_partition() const { - int64_t max_rows = 0; - for (int i = 0; i < _hash_partitions.size(); ++i) { - Partition* partition = _hash_partitions[i]; - if (partition->is_closed || !partition->is_spilled()) { - continue; - } - int64_t rows = partition->aggregated_row_stream->num_rows() + - partition->unaggregated_row_stream->num_rows(); - if (rows > max_rows) { - max_rows = rows; - } - } - return max_rows; -} - -Status PartitionedAggregationNode::next_partition() { - DCHECK(_output_partition == NULL); - - // Keep looping until we get to a partition that fits in memory. - Partition* partition = NULL; - while (true) { - partition = NULL; - // First return partitions that are fully aggregated (and in memory). - if (!_aggregated_partitions.empty()) { - partition = _aggregated_partitions.front(); - DCHECK(!partition->is_spilled()); - _aggregated_partitions.pop_front(); - break; - } - - if (partition == NULL) { - DCHECK(!_spilled_partitions.empty()); - DCHECK_EQ(_state->block_mgr2()->num_pinned_buffers(_block_mgr_client), - _needs_serialize ? 1 : 0); - - // TODO: we can probably do better than just picking the first partition. We - // can base this on the amount written to disk, etc. - partition = _spilled_partitions.front(); - DCHECK(partition->is_spilled()); - - // Create the new hash partitions to repartition into. - // TODO: we don't need to repartition here. We are now working on 1 / FANOUT - // of the input so it's reasonably likely it can fit. We should look at this - // partitions size and just do the aggregation if it fits in memory. - RETURN_IF_ERROR(create_hash_partitions(partition->level + 1)); - COUNTER_UPDATE(_num_repartitions, 1); - - // Rows in this partition could have been spilled into two streams, depending - // on if it is an aggregated intermediate, or an unaggregated row. - // Note: we must process the aggregated rows first to save a hash table lookup - // in process_batch(). - RETURN_IF_ERROR(process_stream(partition->aggregated_row_stream.get())); - RETURN_IF_ERROR(process_stream(partition->unaggregated_row_stream.get())); - - COUNTER_UPDATE(_num_row_repartitioned, partition->aggregated_row_stream->num_rows()); - COUNTER_UPDATE(_num_row_repartitioned, partition->unaggregated_row_stream->num_rows()); - - partition->close(false); - _spilled_partitions.pop_front(); - - // Done processing this partition. Move the new partitions into - // _spilled_partitions/_aggregated_partitions. - int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + - partition->unaggregated_row_stream->num_rows(); - - // Check if there was any reduction in the size of partitions after repartitioning. - int64_t largest_partition = largest_spilled_partition(); - DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with " - "more rows than the input"; - if (num_input_rows == largest_partition) { - // Status status = Status::MemTrackerExceeded(); - // status.AddDetail(Substitute("Cannot perform aggregation at node with id $0. " - // "Repartitioning did not reduce the size of a spilled partition. " - // "Repartitioning level $1. Number of rows $2.", - // _id, partition->level + 1, num_input_rows)); - // _state->SetMemTrackerExceeded(); - stringstream error_msg; - error_msg << "Cannot perform aggregation at node with id " << _id << ". " - << "Repartitioning did not reduce the size of a spilled partition. " - << "Repartitioning level " << partition->level + 1 - << ". Number of rows " << num_input_rows << " ."; - return Status::MemoryLimitExceeded(error_msg.str()); - } - RETURN_IF_ERROR(move_hash_partitions(num_input_rows)); - } - } - - DCHECK(partition->hash_tbl.get() != NULL); - DCHECK(partition->aggregated_row_stream->is_pinned()); - - _output_partition = partition; - _output_iterator = _output_partition->hash_tbl->begin(_ht_ctx.get()); - COUNTER_UPDATE(_num_hash_buckets, _output_partition->hash_tbl->num_buckets()); - return Status::OK(); -} - -template -Status PartitionedAggregationNode::process_stream(BufferedTupleStream2* input_stream) { - if (input_stream->num_rows() > 0) { - while (true) { - bool got_buffer = false; - RETURN_IF_ERROR(input_stream->prepare_for_read(true, &got_buffer)); - if (got_buffer) { - break; - } - // Did not have a buffer to read the input stream. Spill and try again. - RETURN_IF_ERROR(spill_partition()); - } - - bool eos = false; - RowBatch batch(AGGREGATED_ROWS ? *_intermediate_row_desc : _children[0]->row_desc(), - _state->batch_size(), mem_tracker()); - do { - RETURN_IF_ERROR(input_stream->get_next(&batch, &eos)); - RETURN_IF_ERROR(process_batch(&batch, _ht_ctx.get())); - RETURN_IF_ERROR(_state->query_status()); - // free_local_allocations(); - batch.reset(); - } while (!eos); - } - input_stream->close(); - return Status::OK(); -} - -Status PartitionedAggregationNode::spill_partition() { - int64_t max_freed_mem = 0; - int partition_idx = -1; - - // Iterate over the partitions and pick the largest partition that is not spilled. - for (int i = 0; i < _hash_partitions.size(); ++i) { - if (_hash_partitions[i]->is_closed) { - continue; - } - if (_hash_partitions[i]->is_spilled()) { - continue; - } - // TODO: In PHJ the bytes_in_mem() call also calculates the mem used by the - // _write_block, why do we ignore it here? - int64_t mem = _hash_partitions[i]->aggregated_row_stream->bytes_in_mem(true); - mem += _hash_partitions[i]->hash_tbl->byte_size(); - mem += _hash_partitions[i]->agg_fn_pool->total_reserved_bytes(); - if (mem > max_freed_mem) { - max_freed_mem = mem; - partition_idx = i; - } - } - if (partition_idx == -1) { - // Could not find a partition to spill. This means the mem limit was just too low. - return _state->block_mgr2()->mem_limit_too_low_error(_block_mgr_client, id()); - } - - return _hash_partitions[partition_idx]->spill(); -} - -Status PartitionedAggregationNode::move_hash_partitions(int64_t num_input_rows) { - DCHECK(!_hash_partitions.empty()); - stringstream ss; - ss << "PA(node_id=" << id() << ") partitioned(level=" - << _hash_partitions[0]->level << ") " - << num_input_rows << " rows into:" << std::endl; - for (int i = 0; i < _hash_partitions.size(); ++i) { - Partition* partition = _hash_partitions[i]; - int64_t aggregated_rows = partition->aggregated_row_stream->num_rows(); - int64_t unaggregated_rows = partition->unaggregated_row_stream->num_rows(); - int64_t total_rows = aggregated_rows + unaggregated_rows; - double percent = static_cast(total_rows * 100) / num_input_rows; - ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") - << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl - << " #aggregated rows:" << aggregated_rows << std::endl - << " #unaggregated rows: " << unaggregated_rows << std::endl; - - // TODO: update counters to support doubles. - // COUNTER_SET(_largest_partition_percent, static_cast(percent)); - - if (total_rows == 0) { - partition->close(false); - } else if (partition->is_spilled()) { - DCHECK(partition->hash_tbl.get() == NULL); - // We need to unpin all the spilled partitions to make room to allocate new - // _hash_partitions when we repartition the spilled partitions. - // TODO: we only need to do this when we have memory pressure. This might be - // okay though since the block mgr should only write these to disk if there - // is memory pressure. - RETURN_IF_ERROR(partition->aggregated_row_stream->unpin_stream(true)); - RETURN_IF_ERROR(partition->unaggregated_row_stream->unpin_stream(true)); - - // Push new created partitions at the front. This means a depth first walk - // (more finely partitioned partitions are processed first). This allows us - // to delete blocks earlier and bottom out the recursion earlier. - _spilled_partitions.push_front(partition); - } else { - _aggregated_partitions.push_back(partition); - } - - } - VLOG(2) << ss.str(); - _hash_partitions.clear(); - return Status::OK(); -} - -void PartitionedAggregationNode::close_partitions() { - for (int i = 0; i < _hash_partitions.size(); ++i) { - _hash_partitions[i]->close(true); - } - for (list::iterator it = _aggregated_partitions.begin(); - it != _aggregated_partitions.end(); ++it) { - (*it)->close(true); - } - for (list::iterator it = _spilled_partitions.begin(); - it != _spilled_partitions.end(); ++it) { - (*it)->close(true); - } - _aggregated_partitions.clear(); - _spilled_partitions.clear(); - _hash_partitions.clear(); - _partition_pool->clear(); -} - -#if 0 -// Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { -// for (int i = 0; i < _aggregate_evaluators.size(); ++i) { -// ExprContext::free_local_allocations(_aggregate_evaluators[i]->input_expr_ctxs()); -// } -// ExprContext::free_local_allocations(_agg_fn_ctxs); -// for (int i = 0; i < _hash_partitions.size(); ++i) { -// ExprContext::free_local_allocations(_hash_partitions[i]->agg_fn_ctxs); -// } -// return ExecNode::QueryMaintenance(state); -// } -// -#endif - -} diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/partitioned_aggregation_node.h deleted file mode 100644 index bbcc3e0e76..0000000000 --- a/be/src/exec/partitioned_aggregation_node.h +++ /dev/null @@ -1,461 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H -#define DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H - -#include -#include - -#include "exec/exec_node.h" -#include "exec/partitioned_hash_table.inline.h" -#include "runtime/buffered_block_mgr2.h" -#include "runtime/buffered_tuple_stream2.h" -#include "runtime/descriptors.h" // for TupleId -#include "runtime/mem_pool.h" -#include "runtime/string_value.h" - -namespace doris { - -class AggFnEvaluator; -class RowBatch; -class RuntimeState; -struct StringValue; -class Tuple; -class TupleDescriptor; -class SlotDescriptor; - -// Node for doing partitioned hash aggregation. -// This node consumes the input (which can be from the child(0) or a spilled partition). -// 1. Each row is hashed and we pick a dst partition (_hash_partitions). -// 2. If the dst partition is not spilled, we probe into the partitions hash table -// to aggregate/insert the row. -// 3. If the partition is already spilled, the input row is spilled. -// 4. When all the input is consumed, we walk _hash_partitions, put the spilled ones -// into _spilled_partitions and the non-spilled ones into _aggregated_partitions. -// _aggregated_partitions contain partitions that are fully processed and the result -// can just be returned. Partitions in _spilled_partitions need to be repartitioned -// and we just repeat these steps. -// -// Each partition contains these structures: -// 1) Hash Table for aggregated rows. This contains just the hash table directory -// structure but not the rows themselves. This is NULL for spilled partitions when -// we stop maintaining the hash table. -// 2) MemPool for var-len result data for rows in the hash table. If the aggregate -// function returns a string, we cannot append it to the tuple stream as that -// structure is immutable. Instead, when we need to spill, we sweep and copy the -// rows into a tuple stream. -// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream -// contains rows that are aggregated. When the partition is not spilled, this stream -// is pinned and contains the memory referenced by the hash table. -// In the case where the aggregate function does not return a string (meaning the -// size of all the slots is known when the row is constructed), this stream contains -// all the memory for the result rows and the MemPool (2) is not used. -// 4) Unaggregated tuple stream. Stream to spill unaggregated rows. -// Rows in this stream always have child(0)'s layout. -// -// Buffering: Each stream and hash table needs to maintain at least one buffer for -// some duration of the processing. To minimize the memory requirements of small queries -// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash -// tables of each partition start using small (less than IO-sized) buffers, regardless -// of the level. -// -// TODO: Buffer rows before probing into the hash table? -// TODO: After spilling, we can still maintain a very small hash table just to remove -// some number of rows (from likely going to disk). -// TODO: Consider allowing to spill the hash table structure in addition to the rows. -// TODO: Do we want to insert a buffer before probing into the partition's hash table? -// TODO: Use a prefetch/batched probe interface. -// TODO: Return rows from the aggregated_row_stream rather than the HT. -// TODO: Think about spilling heuristic. -// TODO: When processing a spilled partition, we have a lot more information and can -// size the partitions/hash tables better. -// TODO: Start with unpartitioned (single partition) and switch to partitioning and -// spilling only if the size gets large, say larger than the LLC. -// TODO: Simplify or cleanup the various uses of agg_fn_ctx, _agg_fn_ctx, and ctx. -// There are so many contexts in use that a plain "ctx" variable should never be used. -// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this. -class PartitionedAggregationNode : public ExecNode { -public: - PartitionedAggregationNode(ObjectPool* pool, - const TPlanNode& tnode, const DescriptorTbl& descs); - // a null dtor to pass codestyle check - virtual ~PartitionedAggregationNode() {} - - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status reset(RuntimeState* state); - // virtual void close(RuntimeState* state); - virtual Status close(RuntimeState* state); - -protected: - // Frees local allocations from _aggregate_evaluators and agg_fn_ctxs - // virtual Status QueryMaintenance(RuntimeState* state); - - virtual void debug_string(int indentation_level, std::stringstream* out) const; - -private: - struct Partition; - - // Number of initial partitions to create. Must be a power of 2. - static const int PARTITION_FANOUT = 16; - - // Needs to be the log(PARTITION_FANOUT). - // We use the upper bits to pick the partition and lower bits in the HT. - // TODO: different hash functions here too? We don't need that many bits to pick - // the partition so this might be okay. - static const int NUM_PARTITIONING_BITS = 4; - - // Maximum number of times we will repartition. The maximum build table we can process - // (if we have enough scratch disk space) in case there is no skew is: - // MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). - // In the case where there is skew, repartitioning is unlikely to help (assuming a - // reasonable hash function). - // Note that we need to have at least as many SEED_PRIMES in PartitionedHashTableCtx. - // TODO: we can revisit and try harder to explicitly detect skew. - static const int MAX_PARTITION_DEPTH = 16; - - // Codegen doesn't allow for automatic Status variables because then exception - // handling code is needed to destruct the Status, and our function call substitution - // doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by - // placing the Status here so exceptions won't need to destruct it. - // TODO: fix IMPALA-1948 and remove this. - Status _process_batch_status; - - // Tuple into which Update()/Merge()/Serialize() results are stored. - TupleId _intermediate_tuple_id; - TupleDescriptor* _intermediate_tuple_desc; - - // Row with the intermediate tuple as its only tuple. - boost::scoped_ptr _intermediate_row_desc; - - // Tuple into which Finalize() results are stored. Possibly the same as - // the intermediate tuple. - TupleId _output_tuple_id; - TupleDescriptor* _output_tuple_desc; - - // Certain aggregates require a finalize step, which is the final step of the - // aggregate after consuming all input rows. The finalize step converts the aggregate - // value into its final form. This is true if this node contains aggregate that - // requires a finalize step. - const bool _needs_finalize; - - // Contains any evaluators that require the serialize step. - bool _needs_serialize; - - std::vector _aggregate_evaluators; - - // FunctionContext for each aggregate function and backing MemPool. String data - // returned by the aggregate functions is allocated via these contexts. - // These contexts are only passed to the evaluators in the non-partitioned - // (non-grouping) case. Otherwise they are only used to clone FunctionContexts for the - // partitions. - // TODO: we really need to plumb through CHAR(N) for intermediate types. - std::vector _agg_fn_ctxs; - boost::scoped_ptr _agg_fn_pool; - - // Exprs used to evaluate input rows - std::vector _probe_expr_ctxs; - - // Exprs used to insert constructed aggregation tuple into the hash table. - // All the exprs are simply SlotRefs for the intermediate tuple. - std::vector _build_expr_ctxs; - - // True if the resulting tuple contains var-len agg/grouping values. This - // means we need to do more work when allocating and spilling these rows. - bool _contains_var_len_grouping_exprs; - - RuntimeState* _state; - BufferedBlockMgr2::Client* _block_mgr_client; - - // MemPool used to allocate memory for when we don't have grouping and don't initialize - // the partitioning structures, or during close() when creating new output tuples. - // For non-grouping aggregations, the ownership of the pool's memory is transferred - // to the output batch on eos. The pool should not be Reset() to allow amortizing - // memory allocation over a series of Reset()/open()/get_next()* calls. - boost::scoped_ptr _mem_pool; - - // The current partition and iterator to the next row in its hash table that we need - // to return in get_next() - Partition* _output_partition; - PartitionedHashTable::Iterator _output_iterator; - - typedef Status (*ProcessRowBatchFn)( - PartitionedAggregationNode*, RowBatch*, PartitionedHashTableCtx*); - // Jitted ProcessRowBatch function pointer. Null if codegen is disabled. - ProcessRowBatchFn _process_row_batch_fn; - - // Time spent processing the child rows - RuntimeProfile::Counter* _build_timer; - - // Total time spent resizing hash tables. - RuntimeProfile::Counter* _ht_resize_timer; - - // Time spent returning the aggregated rows - RuntimeProfile::Counter* _get_results_timer; - - // Total number of hash buckets across all partitions. - RuntimeProfile::Counter* _num_hash_buckets; - - // Total number of partitions created. - RuntimeProfile::Counter* _partitions_created; - - // Level of max partition (i.e. number of repartitioning steps). - // RuntimeProfile::HighWaterMarkCounter* _max_partition_level; - - // Number of rows that have been repartitioned. - RuntimeProfile::Counter* _num_row_repartitioned; - - // Number of partitions that have been repartitioned. - RuntimeProfile::Counter* _num_repartitions; - - // Number of partitions that have been spilled. - RuntimeProfile::Counter* _num_spilled_partitions; - - // The largest fraction after repartitioning. This is expected to be - // 1 / PARTITION_FANOUT. A value much larger indicates skew. - // RuntimeProfile::HighWaterMarkCounter* _largest_partition_percent; - - //////////////////////////// - // BEGIN: Members that must be Reset() - - // Result of aggregation w/o GROUP BY. - // Note: can be NULL even if there is no grouping if the result tuple is 0 width - // e.g. select 1 from table group by col. - Tuple* _singleton_output_tuple; - bool _singleton_output_tuple_returned; - - // Used for hash-related functionality, such as evaluating rows and calculating hashes. - // TODO: If we want to multi-thread then this context should be thread-local and not - // associated with the node. - boost::scoped_ptr _ht_ctx; - - // Object pool that holds the Partition objects in _hash_partitions. - boost::scoped_ptr _partition_pool; - - // Current partitions we are partitioning into. - std::vector _hash_partitions; - - // All partitions that have been spilled and need further processing. - std::list _spilled_partitions; - - // All partitions that are aggregated and can just return the results in get_next(). - // After consuming all the input, _hash_partitions is split into _spilled_partitions - // and _aggregated_partitions, depending on if it was spilled or not. - std::list _aggregated_partitions; - - // END: Members that must be Reset() - //////////////////////////// - - // The hash table and streams (aggregated and unaggregated) for an individual - // partition. The streams of each partition always (i.e. regardless of level) - // initially use small buffers. - struct Partition { - Partition(PartitionedAggregationNode* parent, int level) : - parent(parent), is_closed(false), level(level) {} - - // Initializes aggregated_row_stream and unaggregated_row_stream, reserving - // one buffer for each. The buffers backing these streams are reserved, so this - // function will not fail with a continuable OOM. If we fail to init these buffers, - // the mem limit is too low to run this algorithm. - Status init_streams(); - - // Initializes the hash table. Returns false on OOM. - bool init_hash_table(); - - // Called in case we need to serialize aggregated rows. This step effectively does - // a merge aggregation in this node. - Status clean_up(); - - // Closes this partition. If finalize_rows is true, this iterates over all rows - // in aggregated_row_stream and finalizes them (this is only used in the cancellation - // path). - void close(bool finalize_rows); - - // Spills this partition, unpinning streams and cleaning up hash tables as necessary. - Status spill(); - - bool is_spilled() const { - return hash_tbl.get() == NULL; - } - - PartitionedAggregationNode* parent; - - // If true, this partition is closed and there is nothing left to do. - bool is_closed; - - // How many times rows in this partition have been repartitioned. Partitions created - // from the node's children's input is level 0, 1 after the first repartitionining, - // etc. - const int level; - - // Hash table for this partition. - // Can be NULL if this partition is no longer maintaining a hash table (i.e. - // is spilled). - boost::scoped_ptr hash_tbl; - - // Clone of parent's _agg_fn_ctxs and backing MemPool. - std::vector agg_fn_ctxs; - boost::scoped_ptr agg_fn_pool; - - // Tuple stream used to store aggregated rows. When the partition is not spilled, - // (meaning the hash table is maintained), this stream is pinned and contains the - // memory referenced by the hash table. When it is spilled, aggregate rows are - // just appended to this stream. - boost::scoped_ptr aggregated_row_stream; - - // Unaggregated rows that are spilled. - boost::scoped_ptr unaggregated_row_stream; - }; - - // Stream used to store serialized spilled rows. Only used if _needs_serialize - // is set. This stream is never pinned and only used in Partition::spill as a - // a temporary buffer. - boost::scoped_ptr _serialize_stream; - - // Allocates a new aggregation intermediate tuple. - // Initialized to grouping values computed over '_current_row' using 'agg_fn_ctxs'. - // Aggregation expr slots are set to their initial values. - // Pool/Stream specify where the memory (tuple and var len slots) should be allocated - // from. Only one can be set. - // Returns NULL if there was not enough memory to allocate the tuple or an error - // occurred. When returning NULL, sets *status. If 'stream' is set and its small - // buffers get full, it will attempt to switch to IO-buffers. - Tuple* construct_intermediate_tuple( - const std::vector& agg_fn_ctxs, - MemPool* pool, BufferedTupleStream2* stream, Status* status); - - // Updates the given aggregation intermediate tuple with aggregation values computed - // over 'row' using 'agg_fn_ctxs'. Whether the agg fn evaluator calls Update() or - // Merge() is controlled by the evaluator itself, unless enforced explicitly by passing - // in is_merge == true. The override is needed to merge spilled and non-spilled rows - // belonging to the same partition independent of whether the agg fn evaluators have - // is_merge() == true. - // This function is replaced by codegen (which is why we don't use a vector argument - // for agg_fn_ctxs). - void update_tuple(doris_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, - bool is_merge = false); - - // Called on the intermediate tuple of each group after all input rows have been - // consumed and aggregated. Computes the final aggregate values to be returned in - // get_next() using the agg fn evaluators' Serialize() or Finalize(). - // For the Finalize() case if the output tuple is different from the intermediate - // tuple, then a new tuple is allocated from 'pool' to hold the final result. - // Grouping values are copied into the output tuple and the the output tuple holding - // the finalized/serialized aggregate values is returned. - // TODO: Coordinate the allocation of new tuples with the release of memory - // so as not to make memory consumption blow up. - Tuple* get_output_tuple(const std::vector& agg_fn_ctxs, - Tuple* tuple, MemPool* pool); - - // Do the aggregation for all tuple rows in the batch when there is no grouping. - // The PartitionedHashTableCtx argument is unused, but included so the signature matches that of - // process_batch() for codegen. This function is replaced by codegen. - Status process_batch_no_grouping(RowBatch* batch, PartitionedHashTableCtx* ht_ctx = NULL); - - // Processes a batch of rows. This is the core function of the algorithm. We partition - // the rows into _hash_partitions, spilling as necessary. - // If AGGREGATED_ROWS is true, it means that the rows in the batch are already - // pre-aggregated. - // - // This function is replaced by codegen. It's inlined into ProcessBatch_true/false in - // the IR module. We pass in _ht_ctx.get() as an argument for performance. - template - Status IR_ALWAYS_INLINE process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx); - - // This function processes each individual row in process_batch(). Must be inlined - // into process_batch for codegen to substitute function calls with codegen'd versions. - template - Status IR_ALWAYS_INLINE process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx); - - // Create a new intermediate tuple in partition, initialized with row. ht_ctx is - // the context for the partition's hash table and hash is the precomputed hash of - // the row. The row can be an unaggregated or aggregated row depending on - // AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate - // tuple to the partition's stream. Must be inlined into process_batch for codegen to - // substitute function calls with codegen'd versions. insert_it is an iterator for - // insertion returned from PartitionedHashTable::FindOrInsert(). - template - Status IR_ALWAYS_INLINE add_intermediate_tuple(Partition* partition, - PartitionedHashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, PartitionedHashTable::Iterator insert_it); - - // Append a row to a spilled partition. May spill partitions if needed to switch to - // I/O buffers. Selects the correct stream according to the argument. Inlined into - // process_batch(). - template - Status IR_ALWAYS_INLINE append_spilled_row(Partition* partition, TupleRow* row); - - // Append a row to a stream of a spilled partition. May spill partitions if needed - // to append the row. - Status append_spilled_row(BufferedTupleStream2* stream, TupleRow* row); - - // Reads all the rows from input_stream and process them by calling process_batch(). - template - Status process_stream(BufferedTupleStream2* input_stream); - - // Initializes _hash_partitions. 'level' is the level for the partitions to create. - // Also sets _ht_ctx's level to 'level'. - Status create_hash_partitions(int level); - - // Ensure that hash tables for all in-memory partitions are large enough to fit - // num_rows additional rows. - Status check_and_resize_hash_partitions(int num_rows, PartitionedHashTableCtx* ht_ctx); - - // Iterates over all the partitions in _hash_partitions and returns the number of rows - // of the largest spilled partition (in terms of number of aggregated and unaggregated - // rows). - int64_t largest_spilled_partition() const; - - // Prepares the next partition to return results from. On return, this function - // initializes _output_iterator and _output_partition. This either removes - // a partition from _aggregated_partitions (and is done) or removes the next - // partition from _aggregated_partitions and repartitions it. - Status next_partition(); - - // Picks a partition from _hash_partitions to spill. - Status spill_partition(); - - // Moves the partitions in _hash_partitions to _aggregated_partitions or - // _spilled_partitions. Partitions moved to _spilled_partitions are unpinned. - // input_rows is the number of input rows that have been repartitioned. - // Used for diagnostics. - Status move_hash_partitions(int64_t input_rows); - - // Calls close() on every Partition in '_aggregated_partitions', - // '_spilled_partitions', and '_hash_partitions' and then resets the lists, - // the vector and the partition pool. - void close_partitions(); - - // Calls finalizes on all tuples starting at 'it'. - void cleanup_hash_tbl(const std::vector& agg_fn_ctxs, - PartitionedHashTable::Iterator it); - - // We need two buffers per partition, one for the aggregated stream and one - // for the unaggregated stream. We need an additional buffer to read the stream - // we are currently repartitioning. - // If we need to serialize, we need an additional buffer while spilling a partition - // as the partitions aggregate stream needs to be serialized and rewritten. - int min_required_buffers() const { - return 2 * PARTITION_FANOUT + 1 + (_needs_serialize ? 1 : 0); - } -}; - -} // end namespace doris - -#endif // DORIS_BE_SRC_EXEC_PARTITIONED_AGGREGATION_NODE_H diff --git a/be/src/exec/partitioned_aggregation_node_ir.cc b/be/src/exec/partitioned_aggregation_node_ir.cc deleted file mode 100644 index e9837d538f..0000000000 --- a/be/src/exec/partitioned_aggregation_node_ir.cc +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned_aggregation_node.h" - -#include "exec/partitioned_hash_table.inline.h" -#include "runtime/buffered_tuple_stream2.inline.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" - -namespace doris { - -Status PartitionedAggregationNode::process_batch_no_grouping( - RowBatch* batch, PartitionedHashTableCtx* ht_ctx) { - for (int i = 0; i < batch->num_rows(); ++i) { - update_tuple(&_agg_fn_ctxs[0], _singleton_output_tuple, batch->get_row(i)); - } - return Status::OK(); -} - -template -Status PartitionedAggregationNode::process_batch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx) { - DCHECK(!_hash_partitions.empty()); - - // Make sure that no resizes will happen when inserting individual rows to the hash - // table of each partition by pessimistically assuming that all the rows in each batch - // will end up to the same partition. - // TODO: Once we have a histogram with the number of rows per partition, we will have - // accurate resize calls. - int num_rows = batch->num_rows(); - RETURN_IF_ERROR(check_and_resize_hash_partitions(num_rows, ht_ctx)); - - for (int i = 0; i < num_rows; ++i) { - RETURN_IF_ERROR(process_row(batch->get_row(i), ht_ctx)); - } - - return Status::OK(); -} - -template -Status PartitionedAggregationNode::process_row(TupleRow* row, PartitionedHashTableCtx* ht_ctx) { - uint32_t hash = 0; - if (AGGREGATED_ROWS) { - if (!ht_ctx->eval_and_hash_build(row, &hash)) { - return Status::OK(); - } - } else { - if (!ht_ctx->eval_and_hash_probe(row, &hash)) { - return Status::OK(); - } - } - - // To process this row, we first see if it can be aggregated or inserted into this - // partition's hash table. If we need to insert it and that fails, due to OOM, we - // spill the partition. The partition to spill is not necessarily dst_partition, - // so we can try again to insert the row. - Partition* dst_partition = _hash_partitions[hash >> (32 - NUM_PARTITIONING_BITS)]; - if (dst_partition->is_spilled()) { - // This partition is already spilled, just append the row. - return append_spilled_row(dst_partition, row); - } - - PartitionedHashTable* ht = dst_partition->hash_tbl.get(); - DCHECK(ht != NULL); - DCHECK(dst_partition->aggregated_row_stream->is_pinned()); - bool found; - // Find the appropriate bucket in the hash table. There will always be a free - // bucket because we checked the size above. - PartitionedHashTable::Iterator it = ht->find_bucket(ht_ctx, hash, &found); - DCHECK(!it.at_end()) << "Hash table had no free buckets"; - if (AGGREGATED_ROWS) { - // If the row is already an aggregate row, it cannot match anything in the - // hash table since we process the aggregate rows first. These rows should - // have been aggregated in the initial pass. - DCHECK(!found); - } else if (found) { - // Row is already in hash table. Do the aggregation and we're done. - update_tuple(&dst_partition->agg_fn_ctxs[0], it.get_tuple(), row); - return Status::OK(); - } - - // If we are seeing this result row for the first time, we need to construct the - // result row and initialize it. - return add_intermediate_tuple(dst_partition, ht_ctx, row, hash, it); -} - -template -Status PartitionedAggregationNode::add_intermediate_tuple( - Partition* partition, - PartitionedHashTableCtx* ht_ctx, - TupleRow* row, - uint32_t hash, - PartitionedHashTable::Iterator insert_it) { - while (true) { - DCHECK(partition->aggregated_row_stream->is_pinned()); - Tuple* intermediate_tuple = construct_intermediate_tuple(partition->agg_fn_ctxs, - NULL, partition->aggregated_row_stream.get(), &_process_batch_status); - - if (LIKELY(intermediate_tuple != NULL)) { - update_tuple(&partition->agg_fn_ctxs[0], intermediate_tuple, row, AGGREGATED_ROWS); - // After copying and initializing the tuple, insert it into the hash table. - insert_it.set_tuple(intermediate_tuple, hash); - return Status::OK(); - } else if (!_process_batch_status.ok()) { - return _process_batch_status; - } - - // We did not have enough memory to add intermediate_tuple to the stream. - RETURN_IF_ERROR(spill_partition()); - if (partition->is_spilled()) { - return append_spilled_row(partition, row); - } - } -} - -template -Status PartitionedAggregationNode::append_spilled_row(Partition* partition, TupleRow* row) { - DCHECK(partition->is_spilled()); - BufferedTupleStream2* stream = AGGREGATED_ROWS ? - partition->aggregated_row_stream.get() : partition->unaggregated_row_stream.get(); - return append_spilled_row(stream, row); -} - -template Status PartitionedAggregationNode::process_batch( - RowBatch*, PartitionedHashTableCtx*); -template Status PartitionedAggregationNode::process_batch( - RowBatch*, PartitionedHashTableCtx*); - -} // end namespace doris diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc deleted file mode 100644 index 61d9d2e6e7..0000000000 --- a/be/src/exec/partitioned_hash_table.cc +++ /dev/null @@ -1,440 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned_hash_table.inline.h" - -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/slot_ref.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/mem_tracker.h" -#include "runtime/raw_value.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.hpp" -#include "util/doris_metrics.h" - -// DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table"); - -using std::string; -using std::stringstream; -using std::vector; -using std::endl; - -namespace doris { - -// Random primes to multiply the seed with. -static uint32_t SEED_PRIMES[] = { - 1, // First seed must be 1, level 0 is used by other operators in the fragment. - 1431655781, - 1183186591, - 622729787, - 472882027, - 338294347, - 275604541, - 41161739, - 29999999, - 27475109, - 611603, - 16313357, - 11380003, - 21261403, - 33393119, - 101, - 71043403 -}; - -// Put a non-zero constant in the result location for NULL. -// We don't want(NULL, 1) to hash to the same as (0, 1). -// This needs to be as big as the biggest primitive type since the bytes -// get copied directly. -// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes -static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED }; - -// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) -// to reduce the memory footprint of small queries. In particular, we always first use a -// 64KB and a 512KB block before starting using IO-sized blocks. -static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 }; -static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); - -PartitionedHashTableCtx::PartitionedHashTableCtx( - const vector& build_expr_ctxs, const vector& probe_expr_ctxs, - bool stores_nulls, bool finds_nulls, - int32_t initial_seed, int max_levels, int num_build_tuples) : - _build_expr_ctxs(build_expr_ctxs), - _probe_expr_ctxs(probe_expr_ctxs), - _stores_nulls(stores_nulls), - _finds_nulls(finds_nulls), - _level(0), - _row(reinterpret_cast(malloc(sizeof(Tuple*) * num_build_tuples))) { - // Compute the layout and buffer size to store the evaluated expr results - DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size()); - DCHECK(!_build_expr_ctxs.empty()); - _results_buffer_size = Expr::compute_results_layout(_build_expr_ctxs, - &_expr_values_buffer_offsets, &_var_result_begin); - _expr_values_buffer = new uint8_t[_results_buffer_size]; - memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size); - _expr_value_null_bits = new uint8_t[build_expr_ctxs.size()]; - - // Populate the seeds to use for all the levels. TODO: revisit how we generate these. - DCHECK_GE(max_levels, 0); - DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0])); - DCHECK_NE(initial_seed, 0); - _seeds.resize(max_levels + 1); - _seeds[0] = initial_seed; - for (int i = 1; i <= max_levels; ++i) { - _seeds[i] = _seeds[i - 1] * SEED_PRIMES[i]; - } -} - -void PartitionedHashTableCtx::close() { - // TODO: use tr1::array? - DCHECK(_expr_values_buffer != NULL); - delete[] _expr_values_buffer; - _expr_values_buffer = NULL; - DCHECK(_expr_value_null_bits != NULL); - delete[] _expr_value_null_bits; - _expr_value_null_bits = NULL; - free(_row); - _row = NULL; -} - -bool PartitionedHashTableCtx::eval_row(TupleRow* row, const vector& ctxs) { - bool has_null = false; - for (int i = 0; i < ctxs.size(); ++i) { - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - void* val = ctxs[i]->get_value(row); - if (val == NULL) { - // If the table doesn't store nulls, no reason to keep evaluating - if (!_stores_nulls) { - return true; - } - - _expr_value_null_bits[i] = true; - val = reinterpret_cast(&NULL_VALUE); - has_null = true; - } else { - _expr_value_null_bits[i] = false; - } - DCHECK_LE(_build_expr_ctxs[i]->root()->type().get_slot_size(), - sizeof(NULL_VALUE)); - RawValue::write(val, loc, _build_expr_ctxs[i]->root()->type(), NULL); - } - return has_null; -} - -uint32_t PartitionedHashTableCtx::hash_variable_len_row() { - uint32_t hash_val = _seeds[_level]; - // Hash the non-var length portions (if there are any) - if (_var_result_begin != 0) { - hash_val = hash_help(_expr_values_buffer, _var_result_begin, hash_val); - } - - for (int i = 0; i < _build_expr_ctxs.size(); ++i) { - // non-string and null slots are already part of expr_values_buffer - // if (_build_expr_ctxs[i]->root()->type().type != TYPE_STRING && - if (_build_expr_ctxs[i]->root()->type().type != TYPE_VARCHAR) { - continue; - } - - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - if (_expr_value_null_bits[i]) { - // Hash the null random seed values at 'loc' - hash_val = hash_help(loc, sizeof(StringValue), hash_val); - } else { - // Hash the string - // TODO: when using CRC hash on empty string, this only swaps bytes. - StringValue* str = reinterpret_cast(loc); - hash_val = hash_help(str->ptr, str->len, hash_val); - } - } - return hash_val; -} - -bool PartitionedHashTableCtx::equals(TupleRow* build_row) { - for (int i = 0; i < _build_expr_ctxs.size(); ++i) { - void* val = _build_expr_ctxs[i]->get_value(build_row); - if (val == NULL) { - if (!_stores_nulls) { - return false; - } - if (!_expr_value_null_bits[i]) { - return false; - } - continue; - } else { - if (_expr_value_null_bits[i]) { - return false; - } - } - - void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i]; - if (!RawValue::eq(loc, val, _build_expr_ctxs[i]->root()->type())) { - return false; - } - } - return true; -} - -const double PartitionedHashTable::MAX_FILL_FACTOR = 0.75f; - -PartitionedHashTable* PartitionedHashTable::create(RuntimeState* state, - BufferedBlockMgr2::Client* client, int num_build_tuples, - BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, - int64_t initial_num_buckets) { - // return new PartitionedHashTable(FLAGS_enable_quadratic_probing, state, client, - // num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); - return new PartitionedHashTable(config::enable_quadratic_probing, state, client, - num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); -} - -PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, RuntimeState* state, - BufferedBlockMgr2::Client* client, int num_build_tuples, BufferedTupleStream2* stream, - int64_t max_num_buckets, int64_t num_buckets) : - _state(state), - _block_mgr_client(client), - _tuple_stream(stream), - _stores_tuples(num_build_tuples == 1), - _quadratic_probing(quadratic_probing), - _total_data_page_size(0), - _next_node(NULL), - _node_remaining_current_page(0), - _num_duplicate_nodes(0), - _max_num_buckets(max_num_buckets), - _buckets(NULL), - _num_buckets(num_buckets), - _num_filled_buckets(0), - _num_buckets_with_duplicates(0), - _num_build_tuples(num_build_tuples), - _has_matches(false), - _num_probes(0), - _num_failed_probes(0), - _travel_length(0), - _num_hash_collisions(0), - _num_resizes(0) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; - DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0"; - DCHECK(_stores_tuples || stream != NULL); - DCHECK(client != NULL); -} - -bool PartitionedHashTable::init() { - int64_t buckets_byte_size = _num_buckets * sizeof(Bucket); - if (!_state->block_mgr2()->consume_memory(_block_mgr_client, buckets_byte_size)) { - _num_buckets = 0; - return false; - } - _buckets = reinterpret_cast(malloc(buckets_byte_size)); - memset(_buckets, 0, buckets_byte_size); - return true; -} - -void PartitionedHashTable::close() { - // Print statistics only for the large or heavily used hash tables. - // TODO: Tweak these numbers/conditions, or print them always? - const int64_t LARGE_HT = 128 * 1024; - const int64_t HEAVILY_USED = 1024 * 1024; - // TODO: These statistics should go to the runtime profile as well. - if ((_num_buckets > LARGE_HT) || (_num_probes > HEAVILY_USED)) { - VLOG(2) << print_stats(); - } - for (int i = 0; i < _data_pages.size(); ++i) { - _data_pages[i]->del(); - } -#if 0 - if (DorisMetrics::hash_table_total_bytes() != NULL) { - DorisMetrics::hash_table_total_bytes()->increment(-_total_data_page_size); - } -#endif - _data_pages.clear(); - if (_buckets != NULL) { - free(_buckets); - } - _state->block_mgr2()->release_memory(_block_mgr_client, _num_buckets * sizeof(Bucket)); -} - -int64_t PartitionedHashTable::current_mem_size() const { - return _num_buckets * sizeof(Bucket) + _num_duplicate_nodes * sizeof(DuplicateNode); -} - -bool PartitionedHashTable::check_and_resize( - uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx) { - uint64_t shift = 0; - while (_num_filled_buckets + buckets_to_fill > - (_num_buckets << shift) * MAX_FILL_FACTOR) { - // TODO: next prime instead of double? - ++shift; - } - if (shift > 0) { - return resize_buckets(_num_buckets << shift, ht_ctx); - } - return true; -} - -bool PartitionedHashTable::resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) - << "num_buckets=" << num_buckets << " must be a power of 2"; - DCHECK_GT(num_buckets, _num_filled_buckets) << "Cannot shrink the hash table to " - "smaller number of buckets than the number of filled buckets."; - VLOG(2) << "Resizing hash table from " - << _num_buckets << " to " << num_buckets << " buckets."; - if (_max_num_buckets != -1 && num_buckets > _max_num_buckets) { - return false; - } - ++_num_resizes; - - // All memory that can grow proportional to the input should come from the block mgrs - // mem tracker. - // Note that while we copying over the contents of the old hash table, we need to have - // allocated both the old and the new hash table. Once we finish, we return the memory - // of the old hash table. - int64_t old_size = _num_buckets * sizeof(Bucket); - int64_t new_size = num_buckets * sizeof(Bucket); - if (!_state->block_mgr2()->consume_memory(_block_mgr_client, new_size)) { - return false; - } - Bucket* new_buckets = reinterpret_cast(malloc(new_size)); - DCHECK(new_buckets != NULL); - memset(new_buckets, 0, new_size); - - // Walk the old table and copy all the filled buckets to the new (resized) table. - // We do not have to do anything with the duplicate nodes. This operation is expected - // to succeed. - for (PartitionedHashTable::Iterator iter = begin(ht_ctx); !iter.at_end(); - next_filled_bucket(&iter._bucket_idx, &iter._node)) { - Bucket* bucket_to_copy = &_buckets[iter._bucket_idx]; - bool found = false; - int64_t bucket_idx = probe(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found); - DCHECK(!found); - DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though " - " there are free buckets. " << num_buckets << " " << _num_filled_buckets; - Bucket* dst_bucket = &new_buckets[bucket_idx]; - *dst_bucket = *bucket_to_copy; - } - - _num_buckets = num_buckets; - free(_buckets); - _buckets = new_buckets; - _state->block_mgr2()->release_memory(_block_mgr_client, old_size); - return true; -} - -bool PartitionedHashTable::grow_node_array() { - int64_t page_size = 0; - page_size = _state->block_mgr2()->max_block_size(); - if (_data_pages.size() < NUM_SMALL_DATA_PAGES) { - page_size = std::min(page_size, INITIAL_DATA_PAGE_SIZES[_data_pages.size()]); - } - BufferedBlockMgr2::Block* block = NULL; - Status status = _state->block_mgr2()->get_new_block( - _block_mgr_client, NULL, &block, page_size); - DCHECK(status.ok() || block == NULL); - if (block == NULL) { - return false; - } - _data_pages.push_back(block); - _next_node = block->allocate(page_size); -#if 0 - if (DorisMetrics::hash_table_total_bytes() != NULL) { - DorisMetrics::hash_table_total_bytes()->increment(page_size); - } -#endif - _node_remaining_current_page = page_size / sizeof(DuplicateNode); - _total_data_page_size += page_size; - return true; -} - -void PartitionedHashTable::debug_string_tuple( - stringstream& ss, HtData& htdata, const RowDescriptor* desc) { - if (_stores_tuples) { - ss << "(" << htdata.tuple << ")"; - } else { - ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx() - << ", " << htdata.idx.offset() << ")"; - } - if (desc != NULL) { - Tuple* row[_num_build_tuples]; - ss << " " << get_row(htdata, reinterpret_cast(row))->to_string(*desc); - } -} - -string PartitionedHashTable::debug_string( - bool skip_empty, bool show_match, const RowDescriptor* desc) { - stringstream ss; - ss << endl; - for (int i = 0; i < _num_buckets; ++i) { - if (skip_empty && !_buckets[i].filled) { - continue; - } - ss << i << ": "; - if (show_match) { - if (_buckets[i].matched) { - ss << " [M]"; - } else { - ss << " [U]"; - } - } - if (_buckets[i].hasDuplicates) { - DuplicateNode* node = _buckets[i].bucketData.duplicates; - bool first = true; - ss << " [D] "; - while (node != NULL) { - if (!first) { - ss << ","; - } - debug_string_tuple(ss, node->htdata, desc); - node = node->next; - first = false; - } - } else { - ss << " [B] "; - if (_buckets[i].filled) { - debug_string_tuple(ss, _buckets[i].bucketData.htdata, desc); - } else { - ss << " - "; - } - } - ss << endl; - } - return ss.str(); -} - -string PartitionedHashTable::print_stats() const { - double curr_fill_factor = (double)_num_filled_buckets / (double)_num_buckets; - double avg_travel = (double)_travel_length / (double)_num_probes; - double avg_collisions = (double)_num_hash_collisions / (double)_num_filled_buckets; - stringstream ss; - ss << "Buckets: " << _num_buckets << " " << _num_filled_buckets << " " - << curr_fill_factor << endl; - ss << "Duplicates: " << _num_buckets_with_duplicates << " buckets " - << _num_duplicate_nodes << " nodes" << endl; - ss << "Probes: " << _num_probes << endl; - ss << "FailedProbes: " << _num_failed_probes << endl; - ss << "Travel: " << _travel_length << " " << avg_travel << endl; - ss << "HashCollisions: " << _num_hash_collisions << " " << avg_collisions << endl; - ss << "Resizes: " << _num_resizes << endl; - return ss.str(); -} - -} // namespace doris - diff --git a/be/src/exec/partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h deleted file mode 100644 index 94f6c9847f..0000000000 --- a/be/src/exec/partitioned_hash_table.h +++ /dev/null @@ -1,673 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H -#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H - -#include -#include -#include - -#include "codegen/doris_ir.h" -#include "util/logging.h" -#include "runtime/buffered_block_mgr2.h" -#include "runtime/buffered_tuple_stream2.h" -#include "runtime/buffered_tuple_stream2.inline.h" -#include "runtime/mem_tracker.h" -#include "runtime/mem_tracker.h" -#include "runtime/tuple_row.h" -#include "util/hash_util.hpp" -#include "util/bit_util.h" - -namespace doris { - -class Expr; -class ExprContext; -class MemTracker; -class MemTracker; -class RowDescriptor; -class RuntimeState; -class Tuple; -class TupleRow; -class PartitionedHashTable; - -// Linear or quadratic probing hash table implementation tailored to the usage pattern -// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and -// allows for different exprs for insertions and finds. This is the pattern we use for -// joins and aggregation where the input/build tuple row descriptor is different from the -// find/probe descriptor. The implementation is designed to allow codegen for some paths. -// -// In addition to the hash table there is also an accompanying hash table context that -// is used for insertions and probes. For example, the hash table context stores -// evaluated expr results for the current row being processed when possible into a -// contiguous memory buffer. This allows for efficient hash computation. -// -// The hash table does not support removes. The hash table is not thread safe. -// The table is optimized for the partition hash aggregation and hash joins and is not -// intended to be a generic hash table implementation. The API loosely mimics the -// std::hashset API. -// -// The data (rows) are stored in a BufferedTupleStream2. The basic data structure of this -// hash table is a vector of buckets. The buckets (indexed by the mod of the hash) -// contain a pointer to either the slot in the tuple-stream or in case of duplicate -// values, to the head of a linked list of nodes that in turn contain a pointer to -// tuple-stream slots. When inserting an entry we start at the bucket at position -// (hash % size) and search for either a bucket with the same hash or for an empty -// bucket. If a bucket with the same hash is found, we then compare for row equality and -// either insert a duplicate node if the equality is true, or continue the search if the -// row equality is false. Similarly, when probing we start from the bucket at position -// (hash % size) and search for an entry with the same hash or for an empty bucket. -// In the former case, we then check for row equality and continue the search if the row -// equality is false. In the latter case, the probe is not successful. When growing the -// hash table, the number of buckets is doubled. We trigger a resize when the fill -// factor is approx 75%. Due to the doubling nature of the buckets, we require that the -// number of buckets is a power of 2. This allows us to perform a modulo of the hash -// using a bitmask. -// -// We choose to use linear or quadratic probing because they exhibit good (predictable) -// cache behavior. -// -// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB) -// to reduce the memory footprint of small queries. -// -// TODO: Compare linear and quadratic probing and remove the loser. -// TODO: We currently use 32-bit hashes. There is room in the bucket structure for at -// least 48-bits. We should exploit this space. -// TODO: Consider capping the probes with a threshold value. If an insert reaches -// that threshold it is inserted to another linked list of overflow entries. -// TODO: Smarter resizes, and perhaps avoid using powers of 2 as the hash table size. -// TODO: this is not a fancy hash table in terms of memory access patterns -// (cuckoo-hashing or something that spills to disk). We will likely want to invest -// more time into this. -// TODO: hash-join and aggregation have very different access patterns. Joins insert -// all the rows and then calls scan to find them. Aggregation interleaves find() and -// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular -// growing). -// TODO: Batched interface for inserts and finds. -// TODO: Do we need to check mem limit exceeded so often. Check once per batch? - -// Control block for a hash table. This class contains the logic as well as the variables -// needed by a thread to operate on a hash table. -class PartitionedHashTableCtx { -public: - // Create a hash table context. - // - build_exprs are the exprs that should be used to evaluate rows during insert(). - // - probe_exprs are used during find() - // - stores_nulls: if false, TupleRows with nulls are ignored during insert - // - finds_nulls: if false, find() returns End() for TupleRows with nulls - // even if stores_nulls is true - // - initial_seed: Initial seed value to use when computing hashes for rows with - // level 0. Other levels have their seeds derived from this seed. - // - The max levels we will hash with. - PartitionedHashTableCtx(const std::vector& build_expr_ctxs, - const std::vector& probe_expr_ctxs, bool stores_nulls, - bool finds_nulls, int32_t initial_seed, int max_levels, - int num_build_tuples); - - // Call to cleanup any resources. - void close(); - - void set_level(int level); - int level() const { return _level; } - uint32_t seed(int level) { return _seeds.at(level); } - - TupleRow* row() const { return _row; } - - // Returns the results of the exprs at 'expr_idx' evaluated over the last row - // processed. - // This value is invalid if the expr evaluated to NULL. - // TODO: this is an awkward abstraction but aggregation node can take advantage of - // it and save some expr evaluation calls. - void* last_expr_value(int expr_idx) const { - return _expr_values_buffer + _expr_values_buffer_offsets[expr_idx]; - } - - // Returns if the expr at 'expr_idx' evaluated to NULL for the last row. - bool last_expr_value_null(int expr_idx) const { - return _expr_value_null_bits[expr_idx]; - } - - // Evaluate and hash the build/probe row, returning in *hash. Returns false if this - // row should be rejected (doesn't need to be processed further) because it - // contains NULL. - // These need to be inlined in the IR module so we can find and replace the calls to - // EvalBuildRow()/EvalProbeRow(). - bool IR_ALWAYS_INLINE eval_and_hash_build(TupleRow* row, uint32_t* hash); - bool IR_ALWAYS_INLINE eval_and_hash_probe(TupleRow* row, uint32_t* hash); - - int results_buffer_size() const { return _results_buffer_size; } - -private: - friend class PartitionedHashTable; - friend class PartitionedHashTableTest_HashEmpty_Test; - - // Compute the hash of the values in _expr_values_buffer. - // This will be replaced by codegen. We don't want this inlined for replacing - // with codegen'd functions so the function name does not change. - uint32_t IR_NO_INLINE HashCurrentRow() { - DCHECK_LT(_level, _seeds.size()); - if (_var_result_begin == -1) { - // This handles NULLs implicitly since a constant seed value was put - // into results buffer for nulls. - // TODO: figure out which hash function to use. We need to generate uncorrelated - // hashes by changing just the seed. CRC does not have this property and FNV is - // okay. We should switch to something else. - return hash_help(_expr_values_buffer, _results_buffer_size, _seeds[_level]); - } else { - return PartitionedHashTableCtx::hash_variable_len_row(); - } - } - - // Wrapper function for calling correct HashUtil function in non-codegen'd case. - uint32_t inline hash_help(const void* input, int len, int32_t hash) { - // Use CRC hash at first level for better performance. Switch to murmur hash at - // subsequent levels since CRC doesn't randomize well with different seed inputs. - if (_level == 0) { - return HashUtil::hash(input, len, hash); - } - return HashUtil::murmur_hash2_64(input, len, hash); - } - - // Evaluate 'row' over build exprs caching the results in '_expr_values_buffer' This - // will be replaced by codegen. We do not want this function inlined when cross - // compiled because we need to be able to differentiate between EvalBuildRow and - // EvalProbeRow by name and the build/probe exprs are baked into the codegen'd - // function. - bool IR_NO_INLINE EvalBuildRow(TupleRow* row) { - return eval_row(row, _build_expr_ctxs); - } - - // Evaluate 'row' over probe exprs caching the results in '_expr_values_buffer' - // This will be replaced by codegen. - bool IR_NO_INLINE EvalProbeRow(TupleRow* row) { - return eval_row(row, _probe_expr_ctxs); - } - - // Compute the hash of the values in _expr_values_buffer for rows with variable length - // fields (e.g. strings). - uint32_t hash_variable_len_row(); - - // Evaluate the exprs over row and cache the results in '_expr_values_buffer'. - // Returns whether any expr evaluated to NULL. - // This will be replaced by codegen. - bool eval_row(TupleRow* row, const std::vector& ctxs); - - // Returns true if the values of build_exprs evaluated over 'build_row' equal - // the values cached in _expr_values_buffer. - // This will be replaced by codegen. - bool IR_NO_INLINE equals(TupleRow* build_row); - - const std::vector& _build_expr_ctxs; - const std::vector& _probe_expr_ctxs; - - // Constants on how the hash table should behave. Joins and aggs have slightly - // different behavior. - // TODO: these constants are an ideal candidate to be removed with codegen. - // TODO: ..or with template-ization - const bool _stores_nulls; - const bool _finds_nulls; - - // The current level this context is working on. Each level needs to use a - // different seed. - int _level; - - // The seeds to use for hashing. Indexed by the level. - std::vector _seeds; - - // Cache of exprs values for the current row being evaluated. This can either - // be a build row (during insert()) or probe row (during find()). - std::vector _expr_values_buffer_offsets; - - // Byte offset into '_expr_values_buffer' that begins the variable length results. - // If -1, there are no variable length slots. Never changes once set, can be removed - // with codegen. - int _var_result_begin; - - // Byte size of '_expr_values_buffer'. Never changes once set, can be removed with - // codegen. - int _results_buffer_size; - - // Buffer to store evaluated expr results. This address must not change once - // allocated since the address is baked into the codegen. - uint8_t* _expr_values_buffer; - - // Use bytes instead of bools to be compatible with llvm. This address must - // not change once allocated. - uint8_t* _expr_value_null_bits; - - // Scratch buffer to generate rows on the fly. - TupleRow* _row; - - // Cross-compiled functions to access member variables used in codegen_hash_current_row(). - uint32_t get_hash_seed() const; -}; - -// The hash table consists of a contiguous array of buckets that contain a pointer to the -// data, the hash value and three flags: whether this bucket is filled, whether this -// entry has been matched (used in right and full joins) and whether this entry has -// duplicates. If there are duplicates, then the data is pointing to the head of a -// linked list of duplicate nodes that point to the actual data. Note that the duplicate -// nodes do not contain the hash value, because all the linked nodes have the same hash -// value, the one in the bucket. The data is either a tuple stream index or a Tuple*. -// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The -// data allocated by the hash table comes from the BufferedBlockMgr2. -class PartitionedHashTable { -private: - - // Either the row in the tuple stream or a pointer to the single tuple of this row. - union HtData { - BufferedTupleStream2::RowIdx idx; - Tuple* tuple; - }; - - // Linked list of entries used for duplicates. - struct DuplicateNode { - // Used for full outer and right {outer, anti, semi} joins. Indicates whether the - // row in the DuplicateNode has been matched. - // From an abstraction point of view, this is an awkward place to store this - // information. - // TODO: Fold this flag in the next pointer below. - bool matched; - - // Chain to next duplicate node, NULL when end of list. - DuplicateNode* next; - HtData htdata; - }; - - struct Bucket { - // Whether this bucket contains a vaild entry, or it is empty. - bool filled; - - // Used for full outer and right {outer, anti, semi} joins. Indicates whether the - // row in the bucket has been matched. - // From an abstraction point of view, this is an awkward place to store this - // information but it is efficient. This space is otherwise unused. - bool matched; - - // Used in case of duplicates. If true, then the bucketData union should be used as - // 'duplicates'. - bool hasDuplicates; - - // Cache of the hash for data. - // TODO: Do we even have to cache the hash value? - uint32_t hash; - - // Either the data for this bucket or the linked list of duplicates. - union { - HtData htdata; - DuplicateNode* duplicates; - } bucketData; - }; - -public: - class Iterator; - - // Returns a newly allocated PartitionedHashTable. The probing algorithm is set by the - // FLAG_enable_quadratic_probing. - // - client: block mgr client to allocate data pages from. - // - num_build_tuples: number of Tuples in the build tuple row. - // - tuple_stream: the tuple stream which contains the tuple rows index by the - // hash table. Can be NULL if the rows contain only a single tuple, in which - // case the 'tuple_stream' is unused. - // - max_num_buckets: the maximum number of buckets that can be stored. If we - // try to grow the number of buckets to a larger number, the inserts will fail. - // -1, if it unlimited. - // - initial_num_buckets: number of buckets that the hash table should be initialized - // with. - static PartitionedHashTable* create(RuntimeState* state, BufferedBlockMgr2::Client* client, - int num_build_tuples, BufferedTupleStream2* tuple_stream, int64_t max_num_buckets, - int64_t initial_num_buckets); - - // Allocates the initial bucket structure. Returns false if OOM. - bool init(); - - // Call to cleanup any resources. Must be called once. - void close(); - - // Inserts the row to the hash table. Returns true if the insertion was successful. - // Always returns true if the table has free buckets and the key is not a duplicate. - // The caller is responsible for ensuring that the table has free buckets - // 'idx' is the index into _tuple_stream for this row. If the row contains more than - // one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the - // hash table and the caller must guarantee it stays in memory. This will not grow the - // hash table. In the case that there is a need to insert a duplicate node, instead of - // filling a new bucket, and there is not enough memory to insert a duplicate node, - // the insert fails and this function returns false. - // Used during the build phase of hash joins. - bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, - const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash); - - // Same as insert() but for inserting a single Tuple. The 'tuple' is not copied by - // the hash table and the caller must guarantee it stays in memory. - bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash); - - // Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'. - // Returns PartitionedHashTable::End() if no match is found. The iterator can be iterated until - // PartitionedHashTable::End() to find all the matching rows. Advancing the returned iterator will - // go to the next matching row. The matching rows do not need to be evaluated since all - // the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'. - // Used during the probe phase of hash joins. - Iterator IR_ALWAYS_INLINE find(PartitionedHashTableCtx* ht_ctx, uint32_t hash); - - // If a match is found in the table, return an iterator as in find(). If a match was - // not present, return an iterator pointing to the empty bucket where the key should - // be inserted. Returns End() if the table is full. The caller can set the data in - // the bucket using a Set*() method on the iterator. - Iterator IR_ALWAYS_INLINE find_bucket(PartitionedHashTableCtx* ht_ctx, uint32_t hash, - bool* found); - - // Returns number of elements inserted in the hash table - int64_t size() const { - return _num_filled_buckets - _num_buckets_with_duplicates + _num_duplicate_nodes; - } - - // Returns the number of empty buckets. - int64_t empty_buckets() const { return _num_buckets - _num_filled_buckets; } - - // Returns the number of buckets - int64_t num_buckets() const { return _num_buckets; } - - // Returns the load factor (the number of non-empty buckets) - double load_factor() const { - return static_cast(_num_filled_buckets) / _num_buckets; - } - - // Returns an estimate of the number of bytes needed to build the hash table - // structure for 'num_rows'. To do that, it estimates the number of buckets, - // rounded up to a power of two, and also assumes that there are no duplicates. - static int64_t EstimateNumBuckets(int64_t num_rows) { - // Assume max 66% fill factor and no duplicates. - return BitUtil::next_power_of_two(3 * num_rows / 2); - } - static int64_t EstimateSize(int64_t num_rows) { - int64_t num_buckets = EstimateNumBuckets(num_rows); - return num_buckets * sizeof(Bucket); - } - - // Returns the memory occupied by the hash table, takes into account the number of - // duplicates. - int64_t current_mem_size() const; - - // Calculates the fill factor if 'buckets_to_fill' additional buckets were to be - // filled and resizes the hash table so that the projected fill factor is below the - // max fill factor. - // If it returns true, then it is guaranteed at least 'rows_to_add' rows can be - // inserted without need to resize. - bool check_and_resize(uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx); - - // Returns the number of bytes allocated to the hash table - int64_t byte_size() const { return _total_data_page_size; } - - // Returns an iterator at the beginning of the hash table. Advancing this iterator - // will traverse all elements. - Iterator begin(PartitionedHashTableCtx* ht_ctx); - - // Return an iterator pointing to the first element (Bucket or DuplicateNode, if the - // bucket has duplicates) in the hash table that does not have its matched flag set. - // Used in right joins and full-outer joins. - Iterator first_unmatched(PartitionedHashTableCtx* ctx); - - // Return true if there was a least one match. - bool HasMatches() const { return _has_matches; } - - // Return end marker. - Iterator End() { return Iterator(); } - - // Dump out the entire hash table to string. If 'skip_empty', empty buckets are - // skipped. If 'show_match', it also prints the matched flag of each node. If - // 'build_desc' is non-null, the build rows will be printed. Otherwise, only the - // the addresses of the build rows will be printed. - std::string debug_string(bool skip_empty, bool show_match, - const RowDescriptor* build_desc); - - // Print the content of a bucket or node. - void debug_string_tuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc); - - // Update and print some statistics that can be used for performance debugging. - std::string print_stats() const; - - // stl-like iterator interface. - class Iterator { - private: - // Bucket index value when probe is not successful. - static const int64_t BUCKET_NOT_FOUND = -1; - - public: - - Iterator() : _table(NULL), _row(NULL), _bucket_idx(BUCKET_NOT_FOUND), _node(NULL) { } - - // Iterates to the next element. It should be called only if !AtEnd(). - void IR_ALWAYS_INLINE next(); - - // Iterates to the next duplicate node. If the bucket does not have duplicates or - // when it reaches the last duplicate node, then it moves the Iterator to AtEnd(). - // Used when we want to iterate over all the duplicate nodes bypassing the next() - // interface (e.g. in semi/outer joins without other_join_conjuncts, in order to - // iterate over all nodes of an unmatched bucket). - void IR_ALWAYS_INLINE next_duplicate(); - - // Iterates to the next element that does not have its matched flag set. Used in - // right-outer and full-outer joins. - void next_unmatched(); - - // Return the current row or tuple. Callers must check the iterator is not AtEnd() - // before calling them. The returned row is owned by the iterator and valid until - // the next call to get_row(). It is safe to advance the iterator. - TupleRow* get_row() const; - Tuple* get_tuple() const; - - // Set the current tuple for an empty bucket. Designed to be used with the - // iterator returned from find_bucket() in the case when the value is not found. - // It is not valid to call this function if the bucket already has an entry. - void set_tuple(Tuple* tuple, uint32_t hash); - - // Sets as matched the Bucket or DuplicateNode currently pointed by the iterator, - // depending on whether the bucket has duplicates or not. The iterator cannot be - // AtEnd(). - void set_matched(); - - // Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on - // whether the bucket has duplicates or not. It should be called only if !AtEnd(). - bool is_matched() const; - - // Resets everything but the pointer to the hash table. - void set_at_end(); - - // Returns true if this iterator is at the end, i.e. get_row() cannot be called. - bool at_end() const { return _bucket_idx == BUCKET_NOT_FOUND; } - - private: - friend class PartitionedHashTable; - - Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node) - : _table(table), - _row(row), - _bucket_idx(bucket_idx), - _node(node) { - } - - PartitionedHashTable* _table; - TupleRow* _row; - - // Current bucket idx. - // TODO: Use uint32_t? - int64_t _bucket_idx; - - // Pointer to the current duplicate node. - DuplicateNode* _node; - }; - -private: - friend class Iterator; - friend class PartitionedHashTableTest; - - // Hash table constructor. Private because Create() should be used, instead - // of calling this constructor directly. - // - quadratic_probing: set to true when the probing algorithm is quadratic, as - // opposed to linear. - PartitionedHashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr2::Client* client, - int num_build_tuples, BufferedTupleStream2* tuple_stream, - int64_t max_num_buckets, int64_t initial_num_buckets); - - // Performs the probing operation according to the probing algorithm (linear or - // quadratic. Returns one of the following: - // (a) the index of the bucket that contains the entry that matches with the last row - // evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row - // equality and returns the index of the first empty bucket. - // (b) the index of the first empty bucket according to the probing algorithm (linear - // or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL. - // (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum - // distance was traveled without finding either an empty or a matching bucket. - // Using the returned index value, the caller can create an iterator that can be - // iterated until End() to find all the matching rows. - // EvalAndHashBuild() or EvalAndHashProb(e) must have been called before calling this. - // 'hash' must be the hash returned by these functions. - // 'found' indicates that a bucket that contains an equal row is found. - // - // There are wrappers of this function that perform the find and insert logic. - int64_t IR_ALWAYS_INLINE probe(Bucket* buckets, int64_t num_buckets, - PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found); - - // Performs the insert logic. Returns the HtData* of the bucket or duplicate node - // where the data should be inserted. Returns NULL if the insert was not successful. - HtData* IR_ALWAYS_INLINE insert_internal(PartitionedHashTableCtx* ht_ctx, uint32_t hash); - - // Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has - // duplicates, 'node' will be pointing to the head of the linked list of duplicates. - // Otherwise, 'node' should not be used. If there are no more buckets, sets - // 'bucket_idx' to BUCKET_NOT_FOUND. - void next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node); - - // Resize the hash table to 'num_buckets'. Returns false on OOM. - bool resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx); - - // Appends the DuplicateNode pointed by _next_node to 'bucket' and moves the _next_node - // pointer to the next DuplicateNode in the page, updating the remaining node counter. - DuplicateNode* IR_ALWAYS_INLINE append_next_node(Bucket* bucket); - - // Creates a new DuplicateNode for a entry and chains it to the bucket with index - // 'bucket_idx'. The duplicate nodes of a bucket are chained as a linked list. - // This places the new duplicate node at the beginning of the list. If this is the - // first duplicate entry inserted in this bucket, then the entry already contained by - // the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the - // bucket are copied to a DuplicateNode and 'data' is updated to pointing to a - // DuplicateNode. - // Returns NULL if the node array could not grow, i.e. there was not enough memory to - // allocate a new DuplicateNode. - DuplicateNode* IR_ALWAYS_INLINE insert_duplicate_node(int64_t bucket_idx); - - // Resets the contents of the empty bucket with index 'bucket_idx', in preparation for - // an insert. Sets all the fields of the bucket other than 'data'. - void IR_ALWAYS_INLINE prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash); - - // Return the TupleRow pointed by 'htdata'. - TupleRow* get_row(HtData& htdata, TupleRow* row) const; - - // Returns the TupleRow of the pointed 'bucket'. In case of duplicates, it - // returns the content of the first chained duplicate node of the bucket. - TupleRow* get_row(Bucket* bucket, TupleRow* row) const; - - // Grow the node array. Returns false on OOM. - bool grow_node_array(); - - // Load factor that will trigger growing the hash table on insert. This is - // defined as the number of non-empty buckets / total_buckets - static const double MAX_FILL_FACTOR; - - RuntimeState* _state; - - // Client to allocate data pages with. - BufferedBlockMgr2::Client* _block_mgr_client; - - // Stream contains the rows referenced by the hash table. Can be NULL if the - // row only contains a single tuple, in which case the TupleRow indirection - // is removed by the hash table. - BufferedTupleStream2* _tuple_stream; - - // Constants on how the hash table should behave. Joins and aggs have slightly - // different behavior. - // TODO: these constants are an ideal candidate to be removed with codegen. - // TODO: ..or with template-ization - const bool _stores_tuples; - - // Quadratic probing enabled (as opposed to linear). - const bool _quadratic_probing; - - // Data pages for all nodes. These are always pinned. - std::vector _data_pages; - - // Byte size of all buffers in _data_pages. - int64_t _total_data_page_size; - - // Next duplicate node to insert. Vaild when _node_remaining_current_page > 0. - DuplicateNode* _next_node; - - // Number of nodes left in the current page. - int _node_remaining_current_page; - - // Number of duplicate nodes. - int64_t _num_duplicate_nodes; - - const int64_t _max_num_buckets; - - // Array of all buckets. Owned by this node. Using c-style array to control - // control memory footprint. - Bucket* _buckets; - - // Total number of buckets (filled and empty). - int64_t _num_buckets; - - // Number of non-empty buckets. Used to determine when to resize. - int64_t _num_filled_buckets; - - // Number of (non-empty) buckets with duplicates. These buckets do not point to slots - // in the tuple stream, rather than to a linked list of Nodes. - int64_t _num_buckets_with_duplicates; - - // Number of build tuples, used for constructing temp row* for probes. - // TODO: We should remove it. - const int _num_build_tuples; - - // Flag used to disable spilling hash tables that already had matches in case of - // right joins (IMPALA-1488). - // TODO: Not fail when spilling hash tables with matches in right joins - bool _has_matches; - - // The stats below can be used for debugging perf. - // TODO: Should we make these statistics atomic? - // Number of find(), insert(), or find_bucket() calls that probe the hash table. - int64_t _num_probes; - - // Number of probes that failed and had to fall back to linear probing without cap. - int64_t _num_failed_probes; - - // Total distance traveled for each probe. That is the sum of the diff between the end - // position of a probe (find/insert) and its start position - // (hash & (_num_buckets - 1)). - int64_t _travel_length; - - // The number of cases where we had to compare buckets with the same hash value, but - // the row equality failed. - int64_t _num_hash_collisions; - - // How many times this table has resized so far. - int64_t _num_resizes; -}; - -} // end namespace doris - -#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H diff --git a/be/src/exec/partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h deleted file mode 100644 index 09abc3de13..0000000000 --- a/be/src/exec/partitioned_hash_table.inline.h +++ /dev/null @@ -1,379 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H -#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H - -#include "exec/partitioned_hash_table.h" - -namespace doris { - -inline bool PartitionedHashTableCtx::eval_and_hash_build(TupleRow* row, uint32_t* hash) { - bool has_null = EvalBuildRow(row); - if (!_stores_nulls && has_null) { - return false; - } - *hash = HashCurrentRow(); - return true; -} - -inline bool PartitionedHashTableCtx::eval_and_hash_probe(TupleRow* row, uint32_t* hash) { - bool has_null = EvalProbeRow(row); - if ((!_stores_nulls || !_finds_nulls) && has_null) { - return false; - } - *hash = HashCurrentRow(); - return true; -} - -inline int64_t PartitionedHashTable::probe(Bucket* buckets, int64_t num_buckets, - PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) { - DCHECK(buckets != NULL); - DCHECK_GT(num_buckets, 0); - *found = false; - int64_t bucket_idx = hash & (num_buckets - 1); - - // In case of linear probing it counts the total number of steps for statistics and - // for knowing when to exit the loop (e.g. by capping the total travel length). In case - // of quadratic probing it is also used for calculating the length of the next jump. - int64_t step = 0; - do { - Bucket* bucket = &buckets[bucket_idx]; - if (!bucket->filled) { - return bucket_idx; - } - if (hash == bucket->hash) { - if (ht_ctx != NULL && ht_ctx->equals(get_row(bucket, ht_ctx->_row))) { - *found = true; - return bucket_idx; - } - // Row equality failed, or not performed. This is a hash collision. Continue - // searching. - ++_num_hash_collisions; - } - // Move to the next bucket. - ++step; - ++_travel_length; - if (_quadratic_probing) { - // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets. - // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power - // of 2. - bucket_idx = (bucket_idx + step) & (num_buckets - 1); - } else { - bucket_idx = (bucket_idx + 1) & (num_buckets - 1); - } - } while (LIKELY(step < num_buckets)); - DCHECK_EQ(_num_filled_buckets, num_buckets) << "Probing of a non-full table " - << "failed: " << _quadratic_probing << " " << hash; - return Iterator::BUCKET_NOT_FOUND; -} - -inline PartitionedHashTable::HtData* PartitionedHashTable::insert_internal( - PartitionedHashTableCtx* ht_ctx, uint32_t hash) { - ++_num_probes; - bool found = false; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); - DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND); - if (found) { - // We need to insert a duplicate node, note that this may fail to allocate memory. - DuplicateNode* new_node = insert_duplicate_node(bucket_idx); - if (UNLIKELY(new_node == NULL)) { - return NULL; - } - return &new_node->htdata; - } else { - prepare_bucket_for_insert(bucket_idx, hash); - return &_buckets[bucket_idx].bucketData.htdata; - } -} - -inline bool PartitionedHashTable::insert(PartitionedHashTableCtx* ht_ctx, - const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash) { - if (_stores_tuples) { - return insert(ht_ctx, row->get_tuple(0), hash); - } - HtData* htdata = insert_internal(ht_ctx, hash); - // If successful insert, update the contents of the newly inserted entry with 'idx'. - if (LIKELY(htdata != NULL)) { - htdata->idx = idx; - return true; - } - return false; -} - -inline bool PartitionedHashTable::insert( - PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash) { - DCHECK(_stores_tuples); - HtData* htdata = insert_internal(ht_ctx, hash); - // If successful insert, update the contents of the newly inserted entry with 'tuple'. - if (LIKELY(htdata != NULL)) { - htdata->tuple = tuple; - return true; - } - return false; -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::find( - PartitionedHashTableCtx* ht_ctx, uint32_t hash) { - ++_num_probes; - bool found = false; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found); - if (found) { - return Iterator(this, ht_ctx->row(), bucket_idx, - _buckets[bucket_idx].bucketData.duplicates); - } - return End(); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::find_bucket( - PartitionedHashTableCtx* ht_ctx, uint32_t hash, - bool* found) { - ++_num_probes; - int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, found); - DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ? - _buckets[bucket_idx].bucketData.duplicates : NULL; - return Iterator(this, ht_ctx->row(), bucket_idx, duplicates); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::begin(PartitionedHashTableCtx* ctx) { - int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; - DuplicateNode* node = NULL; - next_filled_bucket(&bucket_idx, &node); - return Iterator(this, ctx->row(), bucket_idx, node); -} - -inline PartitionedHashTable::Iterator PartitionedHashTable::first_unmatched( - PartitionedHashTableCtx* ctx) { - int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; - DuplicateNode* node = NULL; - next_filled_bucket(&bucket_idx, &node); - Iterator it(this, ctx->row(), bucket_idx, node); - // Check whether the bucket, or its first duplicate node, is matched. If it is not - // matched, then return. Otherwise, move to the first unmatched entry (node or bucket). - Bucket* bucket = &_buckets[bucket_idx]; - if ((!bucket->hasDuplicates && bucket->matched) || - (bucket->hasDuplicates && node->matched)) { - it.next_unmatched(); - } - return it; -} - -inline void PartitionedHashTable::next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node) { - ++*bucket_idx; - for (; *bucket_idx < _num_buckets; ++*bucket_idx) { - if (_buckets[*bucket_idx].filled) { - *node = _buckets[*bucket_idx].bucketData.duplicates; - return; - } - } - // Reached the end of the hash table. - *bucket_idx = Iterator::BUCKET_NOT_FOUND; - *node = NULL; -} - -inline void PartitionedHashTable::prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash) { - DCHECK_GE(bucket_idx, 0); - DCHECK_LT(bucket_idx, _num_buckets); - Bucket* bucket = &_buckets[bucket_idx]; - DCHECK(!bucket->filled); - ++_num_filled_buckets; - bucket->filled = true; - bucket->matched = false; - bucket->hasDuplicates = false; - bucket->hash = hash; -} - -inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::append_next_node( - Bucket* bucket) { - DCHECK_GT(_node_remaining_current_page, 0); - bucket->bucketData.duplicates = _next_node; - ++_num_duplicate_nodes; - --_node_remaining_current_page; - return _next_node++; -} - -inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::insert_duplicate_node( - int64_t bucket_idx) { - DCHECK_GE(bucket_idx, 0); - DCHECK_LT(bucket_idx, _num_buckets); - Bucket* bucket = &_buckets[bucket_idx]; - DCHECK(bucket->filled); - // Allocate one duplicate node for the new data and one for the preexisting data, - // if needed. - while (_node_remaining_current_page < 1 + !bucket->hasDuplicates) { - if (UNLIKELY(!grow_node_array())) { - return NULL; - } - } - if (!bucket->hasDuplicates) { - // This is the first duplicate in this bucket. It means that we need to convert - // the current entry in the bucket to a node and link it from the bucket. - _next_node->htdata.idx = bucket->bucketData.htdata.idx; - DCHECK(!bucket->matched); - _next_node->matched = false; - _next_node->next = NULL; - append_next_node(bucket); - bucket->hasDuplicates = true; - ++_num_buckets_with_duplicates; - } - // Link a new node. - _next_node->next = bucket->bucketData.duplicates; - _next_node->matched = false; - return append_next_node(bucket); -} - -inline TupleRow* PartitionedHashTable::get_row(HtData& htdata, TupleRow* row) const { - if (_stores_tuples) { - return reinterpret_cast(&htdata.tuple); - } else { - _tuple_stream->get_tuple_row(htdata.idx, row); - return row; - } -} - -inline TupleRow* PartitionedHashTable::get_row(Bucket* bucket, TupleRow* row) const { - DCHECK(bucket != NULL); - if (UNLIKELY(bucket->hasDuplicates)) { - DuplicateNode* duplicate = bucket->bucketData.duplicates; - DCHECK(duplicate != NULL); - return get_row(duplicate->htdata, row); - } else { - return get_row(bucket->bucketData.htdata, row); - } -} - -inline TupleRow* PartitionedHashTable::Iterator::get_row() const { - DCHECK(!at_end()); - DCHECK(_table != NULL); - DCHECK(_row != NULL); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (UNLIKELY(bucket->hasDuplicates)) { - DCHECK(_node != NULL); - return _table->get_row(_node->htdata, _row); - } else { - return _table->get_row(bucket->bucketData.htdata, _row); - } -} - -inline Tuple* PartitionedHashTable::Iterator::get_tuple() const { - DCHECK(!at_end()); - DCHECK(_table->_stores_tuples); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator. - if (UNLIKELY(bucket->hasDuplicates)) { - DCHECK(_node != NULL); - return _node->htdata.tuple; - } else { - return bucket->bucketData.htdata.tuple; - } -} - -inline void PartitionedHashTable::Iterator::set_tuple(Tuple* tuple, uint32_t hash) { - DCHECK(!at_end()); - DCHECK(_table->_stores_tuples); - _table->prepare_bucket_for_insert(_bucket_idx, hash); - _table->_buckets[_bucket_idx].bucketData.htdata.tuple = tuple; -} - -inline void PartitionedHashTable::Iterator::set_matched() { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (bucket->hasDuplicates) { - _node->matched = true; - } else { - bucket->matched = true; - } - // Used for disabling spilling of hash tables in right and full-outer joins with - // matches. See IMPALA-1488. - _table->_has_matches = true; -} - -inline bool PartitionedHashTable::Iterator::is_matched() const { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - if (bucket->hasDuplicates) { - return _node->matched; - } - return bucket->matched; -} - -inline void PartitionedHashTable::Iterator::set_at_end() { - _bucket_idx = BUCKET_NOT_FOUND; - _node = NULL; -} - -inline void PartitionedHashTable::Iterator::next() { - DCHECK(!at_end()); - if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { - _node = _node->next; - } else { - _table->next_filled_bucket(&_bucket_idx, &_node); - } -} - -inline void PartitionedHashTable::Iterator::next_duplicate() { - DCHECK(!at_end()); - if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) { - _node = _node->next; - } else { - _bucket_idx = BUCKET_NOT_FOUND; - _node = NULL; - } -} - -inline void PartitionedHashTable::Iterator::next_unmatched() { - DCHECK(!at_end()); - Bucket* bucket = &_table->_buckets[_bucket_idx]; - // Check if there is any remaining unmatched duplicate node in the current bucket. - if (bucket->hasDuplicates) { - while (_node->next != NULL) { - _node = _node->next; - if (!_node->matched) { - return; - } - } - } - // Move to the next filled bucket and return if this bucket is not matched or - // iterate to the first not matched duplicate node. - _table->next_filled_bucket(&_bucket_idx, &_node); - while (_bucket_idx != Iterator::BUCKET_NOT_FOUND) { - bucket = &_table->_buckets[_bucket_idx]; - if (!bucket->hasDuplicates) { - if (!bucket->matched) { - return; - } - } else { - while (_node->matched && _node->next != NULL) { - _node = _node->next; - } - if (!_node->matched) { - return; - } - } - _table->next_filled_bucket(&_bucket_idx, &_node); - } -} - -inline void PartitionedHashTableCtx::set_level(int level) { - DCHECK_GE(level, 0); - DCHECK_LT(level, _seeds.size()); - _level = level; -} - -} // end namespace doris - -#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H diff --git a/be/src/exec/partitioned_hash_table_ir.cc b/be/src/exec/partitioned_hash_table_ir.cc deleted file mode 100644 index 69d98b5332..0000000000 --- a/be/src/exec/partitioned_hash_table_ir.cc +++ /dev/null @@ -1,29 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifdef IR_COMPILE -#include "exec/partitioned_hash_table.h" - -namespace doris { - -uint32_t PartitionedHashTableCtx::get_hash_seed() const { - return _seeds[_level]; -} - -} // end namespace doris - -#endif diff --git a/be/test/exec/partitioned_hash_table_test.cpp b/be/test/exec/partitioned_hash_table_test.cpp deleted file mode 100644 index 7ef0d2f9dd..0000000000 --- a/be/test/exec/partitioned_hash_table_test.cpp +++ /dev/null @@ -1,606 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/partitioned_hash_table.inline.h" - -#include - -#include -#include -#include -#include - -#include - -#include "common/compiler_util.h" -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/slot_ref.h" -#include "runtime/mem_pool.h" -#include "runtime/mem_tracker.h" -#include "runtime/string_value.h" -#include "runtime/test_env.h" -#include "util/cpu_info.h" -#include "util/disk_info.h" -#include "util/runtime_profile.h" - -using std::vector; -using std::map; - -using boost::scoped_ptr; - -namespace doris { - -class PartitionedHashTableTest : public testing::Test { -public: - PartitionedHashTableTest() : _limit(-1), _mem_pool(&_limit) {} - ~PartitionedHashTableTest() {} - -protected: - scoped_ptr _test_env; - RuntimeState* _runtime_state; - ObjectPool _pool; - MemTracker _tracker; - MemTracker _limit; - MemPool _mem_pool; - vector _build_expr_ctxs; - vector _probe_expr_ctxs; - - virtual void SetUp() { - _test_env.reset(new TestEnv()); - - RowDescriptor desc; - Status status; - - // Not very easy to test complex tuple layouts so this test will use the - // simplest. The purpose of these tests is to exercise the hash map - // internals so a simple build/probe expr is fine. - Expr* expr = _pool.add(new SlotRef(TYPE_INT, 0)); - _build_expr_ctxs.push_back(_pool.add(new ExprContext(expr))); - status = Expr::prepare(_build_expr_ctxs, NULL, desc, &_tracker); - EXPECT_TRUE(status.ok()); - status = Expr::open(_build_expr_ctxs, NULL); - EXPECT_TRUE(status.ok()); - - expr = _pool.add(new SlotRef(TYPE_INT, 0)); - _probe_expr_ctxs.push_back(_pool.add(new ExprContext(expr))); - status = Expr::prepare(_probe_expr_ctxs, NULL, desc, &_tracker); - EXPECT_TRUE(status.ok()); - status = Expr::open(_probe_expr_ctxs, NULL); - EXPECT_TRUE(status.ok()); - } - - virtual void TearDown() { - Expr::close(_build_expr_ctxs, NULL); - Expr::close(_probe_expr_ctxs, NULL); - _runtime_state = NULL; - _test_env.reset(); - _mem_pool.free_all(); - } - - TupleRow* CreateTupleRow(int32_t val) { - uint8_t* tuple_row_mem = _mem_pool.allocate(sizeof(int32_t*)); - Tuple* tuple_mem = Tuple::create(sizeof(int32_t), &_mem_pool); - *reinterpret_cast(tuple_mem) = val; - TupleRow* row = reinterpret_cast(tuple_row_mem); - row->set_tuple(0, tuple_mem); - return row; - } - - // Wrapper to call private methods on PartitionedHashTable - // TODO: understand google testing, there must be a more natural way to do this - void ResizeTable( - PartitionedHashTable* table, int64_t new_size, PartitionedHashTableCtx* ht_ctx) { - table->resize_buckets(new_size, ht_ctx); - } - - // Do a full table scan on table. All values should be between [min,max). If - // all_unique, then each key(int value) should only appear once. Results are - // stored in results, indexed by the key. Results must have been preallocated to - // be at least max size. - void FullScan(PartitionedHashTable* table, PartitionedHashTableCtx* ht_ctx, int min, int max, - bool all_unique, TupleRow** results, TupleRow** expected) { - PartitionedHashTable::Iterator iter = table->begin(ht_ctx); - while (!iter.at_end()) { - TupleRow* row = iter.get_row(); - int32_t val = *reinterpret_cast(_build_expr_ctxs[0]->get_value(row)); - EXPECT_GE(val, min); - EXPECT_LT(val, max); - if (all_unique) { - EXPECT_TRUE(results[val] == NULL); - } - EXPECT_EQ(row->get_tuple(0), expected[val]->get_tuple(0)); - results[val] = row; - iter.next(); - } - } - - // Validate that probe_row evaluates overs probe_exprs is equal to build_row - // evaluated over build_exprs - void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) { - EXPECT_TRUE(probe_row != build_row); - int32_t build_val = - *reinterpret_cast(_build_expr_ctxs[0]->get_value(probe_row)); - int32_t probe_val = - *reinterpret_cast(_probe_expr_ctxs[0]->get_value(build_row)); - EXPECT_EQ(build_val, probe_val); - } - - struct ProbeTestData { - TupleRow* probe_row; - vector expected_build_rows; - }; - - void ProbeTest(PartitionedHashTable* table, PartitionedHashTableCtx* ht_ctx, - ProbeTestData* data, int num_data, bool scan) { - uint32_t hash = 0; - for (int i = 0; i < num_data; ++i) { - TupleRow* row = data[i].probe_row; - - PartitionedHashTable::Iterator iter; - if (ht_ctx->eval_and_hash_probe(row, &hash)) { - continue; - } - iter = table->find(ht_ctx, hash); - - if (data[i].expected_build_rows.size() == 0) { - EXPECT_TRUE(iter.at_end()); - } else { - if (scan) { - map matched; - while (!iter.at_end()) { - EXPECT_EQ(matched.find(iter.get_row()), matched.end()); - matched[iter.get_row()] = true; - iter.next(); - } - EXPECT_EQ(matched.size(), data[i].expected_build_rows.size()); - for (int j = 0; i < data[j].expected_build_rows.size(); ++j) { - EXPECT_TRUE(matched[data[i].expected_build_rows[j]]); - } - } else { - EXPECT_EQ(data[i].expected_build_rows.size(), 1); - EXPECT_EQ(data[i].expected_build_rows[0]->get_tuple(0), - iter.get_row()->get_tuple(0)); - ValidateMatch(row, iter.get_row()); - } - } - } - } - - // Construct hash table with custom block manager. Returns result of PartitionedHashTable::init() - bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, - scoped_ptr* table, int block_size = 8 * 1024 * 1024, - int max_num_blocks = 100, int reserved_blocks = 10) { - EXPECT_TRUE(_test_env->create_query_state(0, max_num_blocks, block_size, - &_runtime_state).ok()); - _runtime_state->init_mem_trackers(TUniqueId()); - - BufferedBlockMgr2::Client* client; - EXPECT_TRUE(_runtime_state->block_mgr2()->register_client(reserved_blocks, &_limit, - _runtime_state, &client).ok()); - - // Initial_num_buckets must be a power of two. - EXPECT_EQ(initial_num_buckets, BitUtil::next_power_of_two(initial_num_buckets)); - int64_t max_num_buckets = 1L << 31; - table->reset(new PartitionedHashTable(quadratic, _runtime_state, client, 1, NULL, - max_num_buckets, initial_num_buckets)); - return (*table)->init(); - } - - // Constructs and closes a hash table. - void SetupTest(bool quadratic, int64_t initial_num_buckets, bool too_big) { - TupleRow* build_row1 = CreateTupleRow(1); - TupleRow* build_row2 = CreateTupleRow(2); - TupleRow* probe_row3 = CreateTupleRow(3); - TupleRow* probe_row4 = CreateTupleRow(4); - - int32_t* val_row1 = - reinterpret_cast(_build_expr_ctxs[0]->get_value(build_row1)); - EXPECT_EQ(*val_row1, 1); - int32_t* val_row2 = - reinterpret_cast(_build_expr_ctxs[0]->get_value(build_row2)); - EXPECT_EQ(*val_row2, 2); - int32_t* val_row3 = - reinterpret_cast(_probe_expr_ctxs[0]->get_value(probe_row3)); - EXPECT_EQ(*val_row3, 3); - int32_t* val_row4 = - reinterpret_cast(_probe_expr_ctxs[0]->get_value(probe_row4)); - EXPECT_EQ(*val_row4, 4); - - // Create and close the hash table. - scoped_ptr hash_table; - bool initialized = CreateHashTable(quadratic, initial_num_buckets, &hash_table); - EXPECT_EQ(too_big, !initialized); - - hash_table->close(); - } - - // This test inserts the build rows [0->5) to hash table. It validates that they - // are all there using a full table scan. It also validates that find() is correct - // testing for probe rows that are both there and not. - // The hash table is resized a few times and the scans/finds are tested again. - void BasicTest(bool quadratic, int initial_num_buckets) { - TupleRow* build_rows[5]; - TupleRow* scan_rows[5] = {0}; - for (int i = 0; i < 5; ++i) { - build_rows[i] = CreateTupleRow(i); - } - - ProbeTestData probe_rows[10]; - for (int i = 0; i < 10; ++i) { - probe_rows[i].probe_row = CreateTupleRow(i); - if (i < 5) { - probe_rows[i].expected_build_rows.push_back(build_rows[i]); - } - } - - // Create the hash table and Insert the build rows - scoped_ptr hash_table; - ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table)); - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 0, 1); - - uint32_t hash = 0; - bool success = hash_table->check_and_resize(5, &ht_ctx); - ASSERT_TRUE(success); - for (int i = 0; i < 5; ++i) { - if (!ht_ctx.eval_and_hash_build(build_rows[i], &hash)) { - continue; - } - bool inserted = hash_table->insert(&ht_ctx, build_rows[i]->get_tuple(0), hash); - EXPECT_TRUE(inserted); - } - EXPECT_EQ(hash_table->size(), 5); - - // Do a full table scan and validate returned pointers - FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false); - - // Double the size of the hash table and scan again. - ResizeTable(hash_table.get(), 2048, &ht_ctx); - EXPECT_EQ(hash_table->num_buckets(), 2048); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false); - - // Try to shrink and scan again. - ResizeTable(hash_table.get(), 64, &ht_ctx); - EXPECT_EQ(hash_table->num_buckets(), 64); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false); - - // Resize to 8, which is the smallest value to fit the number of filled buckets. - ResizeTable(hash_table.get(), 8, &ht_ctx); - EXPECT_EQ(hash_table->num_buckets(), 8); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false); - - hash_table->close(); - ht_ctx.close(); - } - - void ScanTest(bool quadratic, int initial_size, int rows_to_insert, - int additional_rows) { - scoped_ptr hash_table; - ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table)); - - int total_rows = rows_to_insert + additional_rows; - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 0, 1); - - // Add 1 row with val 1, 2 with val 2, etc. - vector build_rows; - ProbeTestData* probe_rows = new ProbeTestData[total_rows]; - probe_rows[0].probe_row = CreateTupleRow(0); - uint32_t hash = 0; - for (int val = 1; val <= rows_to_insert; ++val) { - bool success = hash_table->check_and_resize(val, &ht_ctx); - EXPECT_TRUE(success) << " failed to resize: " << val; - probe_rows[val].probe_row = CreateTupleRow(val); - for (int i = 0; i < val; ++i) { - TupleRow* row = CreateTupleRow(val); - if (!ht_ctx.eval_and_hash_build(row, &hash)) { - continue; - } - hash_table->insert(&ht_ctx, row->get_tuple(0), hash); - build_rows.push_back(row); - probe_rows[val].expected_build_rows.push_back(row); - } - } - - // Add some more probe rows that aren't there. - for (int val = rows_to_insert; val < rows_to_insert + additional_rows; ++val) { - probe_rows[val].probe_row = CreateTupleRow(val); - } - - // Test that all the builds were found. - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); - - // Resize and try again. - int target_size = BitUtil::next_power_of_two(2 * total_rows); - ResizeTable(hash_table.get(), target_size, &ht_ctx); - EXPECT_EQ(hash_table->num_buckets(), target_size); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); - - target_size = BitUtil::next_power_of_two(total_rows + 1); - ResizeTable(hash_table.get(), target_size, &ht_ctx); - EXPECT_EQ(hash_table->num_buckets(), target_size); - ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true); - - delete [] probe_rows; - hash_table->close(); - ht_ctx.close(); - } - - // This test continues adding tuples to the hash table and exercises the resize code - // paths. - void GrowTableTest(bool quadratic) { - uint64_t num_to_add = 4; - int expected_size = 0; - - // MemTracker tracker(100 * 1024 * 1024); - MemTracker tracker(100 * 1024 * 1024); - scoped_ptr hash_table; - ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table)); - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 0, 1); - - // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20) - // entries. When num_to_add == 4, then the total number of inserts is 4194300. - int build_row_val = 0; - uint32_t hash = 0; - bool done_inserting = false; - for (int i = 0; i < 20; ++i) { - // Currently the mem used for the bucket is not being tracked by the mem tracker. - // Thus the resize is expected to be successful. - // TODO: Keep track of the mem used for the buckets and test cases where we actually - // hit OOM. - // TODO: Insert duplicates to also hit OOM. - bool success = hash_table->check_and_resize(num_to_add, &ht_ctx); - EXPECT_TRUE(success) << " failed to resize: " << num_to_add; - for (int j = 0; j < num_to_add; ++build_row_val, ++j) { - TupleRow* row = CreateTupleRow(build_row_val); - if (!ht_ctx.eval_and_hash_build(row, &hash)) { - continue; - } - bool inserted = hash_table->insert(&ht_ctx, row->get_tuple(0), hash); - if (!inserted) { - done_inserting = true; - break; - } - } - if (done_inserting) { - break; - } - expected_size += num_to_add; - num_to_add *= 2; - } - EXPECT_FALSE(tracker.limit_exceeded()); - EXPECT_EQ(hash_table->size(), 4194300); - // Validate that we can find the entries before we went over the limit - for (int i = 0; i < expected_size * 5; i += 100000) { - TupleRow* probe_row = CreateTupleRow(i); - if (!ht_ctx.eval_and_hash_probe(probe_row, &hash)) { - continue; - } - PartitionedHashTable::Iterator iter = hash_table->find(&ht_ctx, hash); - if (i < hash_table->size()) { - EXPECT_TRUE(!iter.at_end()) << " i: " << i; - ValidateMatch(probe_row, iter.get_row()); - } else { - EXPECT_TRUE(iter.at_end()) << " i: " << i; - } - } - hash_table->close(); - ht_ctx.close(); - } - - // This test inserts and probes as many elements as the size of the hash table without - // calling resize. All the inserts and probes are expected to succeed, because there is - // enough space in the hash table (it is also expected to be slow). It also expects that - // a probe for a N+1 element will return BUCKET_NOT_FOUND. - void InsertFullTest(bool quadratic, int table_size) { - scoped_ptr hash_table; - ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table)); - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 0, 1); - EXPECT_EQ(hash_table->empty_buckets(), table_size); - - // Insert and probe table_size different tuples. All of them are expected to be - // successfully inserted and probed. - uint32_t hash = 0; - PartitionedHashTable::Iterator iter; - bool found = false; - for (int build_row_val = 0; build_row_val < table_size; ++build_row_val) { - TupleRow* row = CreateTupleRow(build_row_val); - bool passes = ht_ctx.eval_and_hash_build(row, &hash); - EXPECT_TRUE(passes); - - // Insert using both insert() and find_bucket() methods. - if (build_row_val % 2 == 0) { - bool inserted = hash_table->insert(&ht_ctx, row->get_tuple(0), hash); - EXPECT_TRUE(inserted); - } else { - iter = hash_table->find_bucket(&ht_ctx, hash, &found); - EXPECT_FALSE(iter.at_end()); - EXPECT_FALSE(found); - iter.set_tuple(row->get_tuple(0), hash); - } - EXPECT_EQ(hash_table->empty_buckets(), table_size - build_row_val - 1); - - passes = ht_ctx.eval_and_hash_probe(row, &hash); - EXPECT_TRUE(passes); - iter = hash_table->find(&ht_ctx, hash); - EXPECT_FALSE(iter.at_end()); - EXPECT_EQ(row->get_tuple(0), iter.get_tuple()); - - iter = hash_table->find_bucket(&ht_ctx, hash, &found); - EXPECT_FALSE(iter.at_end()); - EXPECT_TRUE(found); - EXPECT_EQ(row->get_tuple(0), iter.get_tuple()); - } - - // Probe for a tuple that does not exist. This should exercise the probe of a full - // hash table code path. - EXPECT_EQ(hash_table->empty_buckets(), 0); - TupleRow* probe_row = CreateTupleRow(table_size); - bool passes = ht_ctx.eval_and_hash_probe(probe_row, &hash); - EXPECT_TRUE(passes); - iter = hash_table->find(&ht_ctx, hash); - EXPECT_TRUE(iter.at_end()); - - // Since hash_table is full, find_bucket cannot find an empty bucket, so returns End(). - iter = hash_table->find_bucket(&ht_ctx, hash, &found); - EXPECT_TRUE(iter.at_end()); - EXPECT_FALSE(found); - - hash_table->close(); - ht_ctx.close(); - } - - // This test makes sure we can tolerate the low memory case where we do not have enough - // memory to allocate the array of buckets for the hash table. - void VeryLowMemTest(bool quadratic) { - const int block_size = 2 * 1024; - const int max_num_blocks = 1; - const int reserved_blocks = 0; - const int table_size = 1024; - scoped_ptr hash_table; - ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size, - max_num_blocks, reserved_blocks)); - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 0, 1); - PartitionedHashTable::Iterator iter = hash_table->begin(&ht_ctx); - EXPECT_TRUE(iter.at_end()); - - hash_table->close(); - } -}; - -TEST_F(PartitionedHashTableTest, LinearSetupTest) { - SetupTest(false, 1, false); - SetupTest(false, 1024, false); - SetupTest(false, 65536, false); - - // Regression test for IMPALA-2065. Trying to init a hash table with large (>2^31) - // number of buckets. - SetupTest(false, 4294967296, true); // 2^32 -} - -TEST_F(PartitionedHashTableTest, QuadraticSetupTest) { - SetupTest(true, 1, false); - SetupTest(true, 1024, false); - SetupTest(true, 65536, false); - - // Regression test for IMPALA-2065. Trying to init a hash table with large (>2^31) - // number of buckets. - SetupTest(true, 4294967296, true); // 2^32 -} - -TEST_F(PartitionedHashTableTest, LinearBasicTest) { - BasicTest(false, 1); - BasicTest(false, 1024); - BasicTest(false, 65536); -} - -TEST_F(PartitionedHashTableTest, QuadraticBasicTest) { - BasicTest(true, 1); - BasicTest(true, 1024); - BasicTest(true, 65536); -} - -// This test makes sure we can scan ranges of buckets. -TEST_F(PartitionedHashTableTest, LinearScanTest) { - ScanTest(false, 1, 10, 5); - ScanTest(false, 1024, 1000, 5); - ScanTest(false, 1024, 1000, 500); -} - -TEST_F(PartitionedHashTableTest, QuadraticScanTest) { - ScanTest(true, 1, 10, 5); - ScanTest(true, 1024, 1000, 5); - ScanTest(true, 1024, 1000, 500); -} - -TEST_F(PartitionedHashTableTest, LinearGrowTableTest) { - GrowTableTest(false); -} - -TEST_F(PartitionedHashTableTest, QuadraticGrowTableTest) { - GrowTableTest(true); -} - -TEST_F(PartitionedHashTableTest, LinearInsertFullTest) { - InsertFullTest(false, 1); - InsertFullTest(false, 4); - InsertFullTest(false, 64); - InsertFullTest(false, 1024); - InsertFullTest(false, 65536); -} - -TEST_F(PartitionedHashTableTest, QuadraticInsertFullTest) { - InsertFullTest(true, 1); - InsertFullTest(true, 4); - InsertFullTest(true, 64); - InsertFullTest(true, 1024); - InsertFullTest(true, 65536); -} - -// Test that hashing empty string updates hash value. -TEST_F(PartitionedHashTableTest, HashEmpty) { - PartitionedHashTableCtx ht_ctx(_build_expr_ctxs, _probe_expr_ctxs, false, false, 1, 2, 1); - uint32_t seed = 9999; - ht_ctx.set_level(0); - EXPECT_NE(seed, ht_ctx.hash_help(NULL, 0, seed)); - // TODO: level 0 uses CRC hash, which only swaps bytes around on empty input. - // EXPECT_NE(seed, ht_ctx.hash_help(NULL, 0, ht_ctx.hash_help(NULL, 0, seed))); - ht_ctx.set_level(1); - EXPECT_NE(seed, ht_ctx.hash_help(NULL, 0, seed)); - EXPECT_NE(seed, ht_ctx.hash_help(NULL, 0, ht_ctx.hash_help(NULL, 0, seed))); - ht_ctx.close(); -} - -TEST_F(PartitionedHashTableTest, VeryLowMemTest) { - VeryLowMemTest(true); - VeryLowMemTest(false); -} -#if 0 -#endif - -} // end namespace doris - -int main(int argc, char** argv) { - // std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; - // if (!doris::config::init(conffile.c_str(), false)) { - // fprintf(stderr, "error read config file. \n"); - // return -1; - // } - doris::config::query_scratch_dirs = "/tmp"; - // doris::config::max_free_io_buffers = 128; - doris::config::read_size = 8388608; - doris::config::min_buffer_size = 1024; - - doris::config::disable_mem_pools = false; - - doris::init_glog("be-test"); - ::testing::InitGoogleTest(&argc, argv); - - doris::CpuInfo::init(); - doris::DiskInfo::init(); - - return RUN_ALL_TESTS(); -}