// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "exec/partitioned_aggregation_node.h" #include #include #include #include #include "exec/partitioned_hash_table.h" #include "exec/partitioned_hash_table.inline.h" #include "exprs/anyval_util.h" #include "exprs/expr_context.h" #include "exprs/new_agg_fn_evaluator.h" // #include "exprs/scalar_expr_evaluator.h" #include "exprs/slot_ref.h" #include "gen_cpp/Exprs_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gutil/strings/substitute.h" #include "runtime/buffered_tuple_stream3.inline.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" #include "runtime/raw_value.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "udf/udf_internal.h" using namespace strings; namespace doris { /// The minimum reduction factor (input rows divided by output rows) to grow hash tables /// in a streaming preaggregation, given that the hash tables are currently the given /// size or above. The sizes roughly correspond to hash table sizes where the bucket /// arrays will fit in a cache level. Intuitively, we don't want the working set of the /// aggregation to expand to the next level of cache unless we're reducing the input /// enough to outweigh the increased memory latency we'll incur for each hash table /// lookup. /// /// Note that the current reduction achieved is not always a good estimate of the /// final reduction. It may be biased either way depending on the ordering of the /// input. If the input order is random, we will underestimate the final reduction /// factor because the probability of a row having the same key as a previous row /// increases as more input is processed. If the input order is correlated with the /// key, skew may bias the estimate. If high cardinality keys appear first, we /// may overestimate and if low cardinality keys appear first, we underestimate. /// To estimate the eventual reduction achieved, we estimate the final reduction /// using the planner's estimated input cardinality and the assumption that input /// is in a random order. This means that we assume that the reduction factor will /// increase over time. struct StreamingHtMinReductionEntry { // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in // bytes is greater than this threshold. int min_ht_mem; // The minimum reduction factor to expand the hash tables. double streaming_ht_min_reduction; }; // TODO: experimentally tune these values and also programmatically get the cache size // of the machine that we're running on. static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { // Expand up to L2 cache always. {0, 0.0}, // Expand into L3 cache if we look like we're getting some reduction. {256 * 1024, 1.1}, // Expand into main memory if we're getting a significant reduction. {2 * 1024 * 1024, 2.0}, }; static const int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); PartitionedAggregationNode::PartitionedAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id), intermediate_tuple_desc_(descs.get_tuple_descriptor(intermediate_tuple_id_)), intermediate_row_desc_(intermediate_tuple_desc_, false), output_tuple_id_(tnode.agg_node.output_tuple_id), output_tuple_desc_(descs.get_tuple_descriptor(output_tuple_id_)), needs_finalize_(tnode.agg_node.need_finalize), needs_serialize_(false), output_partition_(NULL), process_batch_no_grouping_fn_(NULL), process_batch_fn_(NULL), process_batch_streaming_fn_(NULL), build_timer_(NULL), ht_resize_timer_(NULL), ht_resize_counter_(NULL), get_results_timer_(NULL), num_hash_buckets_(NULL), num_hash_filled_buckets_(NULL), num_hash_probe_(NULL), num_hash_failed_probe_(NULL), num_hash_travel_length_(NULL), num_hash_collisions_(NULL), partitions_created_(NULL), max_partition_level_(NULL), num_row_repartitioned_(NULL), num_repartitions_(NULL), num_spilled_partitions_(NULL), largest_partition_percent_(NULL), streaming_timer_(NULL), num_processed_rows_(NULL), num_passthrough_rows_(NULL), preagg_estimated_reduction_(NULL), preagg_streaming_ht_min_reduction_(NULL), // estimated_input_cardinality_(tnode.agg_node.estimated_input_cardinality), singleton_output_tuple_(NULL), singleton_output_tuple_returned_(true), partition_eos_(false), child_eos_(false), partition_pool_(new ObjectPool()) { DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); if (tnode.agg_node.__isset.use_streaming_preaggregation) { is_streaming_preagg_ = tnode.agg_node.use_streaming_preaggregation; if (is_streaming_preagg_) { DCHECK(_conjunct_ctxs.empty()) << "Preaggs have no conjuncts"; DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping"; DCHECK(_limit == -1) << "Preaggs have no limits"; } } else { is_streaming_preagg_ = false; } } Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode)); DCHECK(intermediate_tuple_desc_ != nullptr); DCHECK(output_tuple_desc_ != nullptr); DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size()); const RowDescriptor& row_desc = child(0)->row_desc(); RETURN_IF_ERROR(Expr::create(tnode.agg_node.grouping_exprs, row_desc, state, &grouping_exprs_, mem_tracker())); // Construct build exprs from intermediate_row_desc_ for (int i = 0; i < grouping_exprs_.size(); ++i) { SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i]; //DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type()); // Hack to avoid TYPE_NULL SlotRefs. SlotRef* build_expr = _pool->add(desc->type().type != TYPE_NULL ? new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN)); build_exprs_.push_back(build_expr); // TODO chenhao RETURN_IF_ERROR(build_expr->prepare(state, intermediate_row_desc_, nullptr)); if (build_expr->type().is_var_len_string_type()) string_grouping_exprs_.push_back(i); } int j = grouping_exprs_.size(); for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) { SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j]; SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j]; AggFn* agg_fn; RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc, *intermediate_slot_desc, *output_slot_desc, state, &agg_fn)); agg_fns_.push_back(agg_fn); needs_serialize_ |= agg_fn->SupportsSerialize(); } return Status::OK(); } Status PartitionedAggregationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); state_ = state; mem_pool_.reset(new MemPool(mem_tracker().get())); agg_fn_pool_.reset(new MemPool(expr_mem_tracker().get())); ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime"); get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime"); num_processed_rows_ = ADD_COUNTER(runtime_profile(), "RowsProcessed", TUnit::UNIT); num_hash_buckets_ = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT); num_hash_filled_buckets_ = ADD_COUNTER(runtime_profile(), "HashFilledBuckets", TUnit::UNIT); num_hash_probe_ = ADD_COUNTER(runtime_profile(), "HashProbe", TUnit::UNIT); num_hash_failed_probe_ = ADD_COUNTER(runtime_profile(), "HashFailedProbe", TUnit::UNIT); num_hash_travel_length_ = ADD_COUNTER(runtime_profile(), "HashTravelLength", TUnit::UNIT); num_hash_collisions_ = ADD_COUNTER(runtime_profile(), "HashCollisions", TUnit::UNIT); ht_resize_counter_ = ADD_COUNTER(runtime_profile(), "HTResize", TUnit::UNIT); partitions_created_ = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT); largest_partition_percent_ = runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); if (config::enable_quadratic_probing) { runtime_profile()->add_info_string("Probe Method", "HashTable Quadratic Probing"); } else { runtime_profile()->add_info_string("Probe Method", "HashTable Linear Probing"); } if (is_streaming_preagg_) { runtime_profile()->append_exec_option("Streaming Preaggregation"); streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime"); num_passthrough_rows_ = ADD_COUNTER(runtime_profile(), "RowsPassedThrough", TUnit::UNIT); preagg_estimated_reduction_ = ADD_COUNTER(runtime_profile(), "ReductionFactorEstimate", TUnit::DOUBLE_VALUE); preagg_streaming_ht_min_reduction_ = ADD_COUNTER( runtime_profile(), "ReductionFactorThresholdToExpand", TUnit::DOUBLE_VALUE); } else { build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime"); num_row_repartitioned_ = ADD_COUNTER(runtime_profile(), "RowsRepartitioned", TUnit::UNIT); num_repartitions_ = ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT); num_spilled_partitions_ = ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT); max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT); } // TODO chenhao const RowDescriptor& row_desc = child(0)->row_desc(); RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(), &agg_fn_evals_, expr_mem_tracker(), row_desc)); expr_results_pool_.reset(new MemPool(expr_mem_tracker().get())); if (!grouping_exprs_.empty()) { RowDescriptor build_row_desc(intermediate_tuple_desc_, false); RETURN_IF_ERROR(PartitionedHashTableCtx::Create( _pool, state, build_exprs_, grouping_exprs_, true, vector(build_exprs_.size(), true), state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), expr_results_pool_.get(), expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_)); } // AddCodegenDisabledMessage(state); return Status::OK(); } Status PartitionedAggregationNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); // Open the child before consuming resources in this node. RETURN_IF_ERROR(child(0)->open(state)); RETURN_IF_ERROR(ExecNode::open(state)); // Claim reservation after the child has been opened to reduce the peak reservation // requirement. if (!_buffer_pool_client.is_registered() && !grouping_exprs_.empty()) { DCHECK_GE(_resource_profile.min_reservation, MinReservation()); RETURN_IF_ERROR(claim_buffer_reservation(state)); } if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state)); RETURN_IF_ERROR(NewAggFnEvaluator::Open(agg_fn_evals_, state)); if (grouping_exprs_.empty()) { // Create the single output tuple for this non-grouping agg. This must happen after // opening the aggregate evaluators. singleton_output_tuple_ = ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get()); // Check for failures during NewAggFnEvaluator::Init(). RETURN_IF_ERROR(state_->query_status()); singleton_output_tuple_returned_ = false; } else { if (ht_allocator_ == nullptr) { // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call. ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(), &_buffer_pool_client, _resource_profile.spillable_buffer_size)); if (!is_streaming_preagg_ && needs_serialize_) { serialize_stream_.reset(new BufferedTupleStream3( state, &intermediate_row_desc_, &_buffer_pool_client, _resource_profile.spillable_buffer_size, _resource_profile.max_row_buffer_size)); RETURN_IF_ERROR(serialize_stream_->Init(id(), false)); bool got_buffer; // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up // another buffer during spilling. RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) << "Accounted in min reservation" << _buffer_pool_client.DebugString(); DCHECK(serialize_stream_->has_write_iterator()); } } RETURN_IF_ERROR(CreateHashPartitions(0)); } // Streaming preaggregations do all processing in GetNext(). if (is_streaming_preagg_) return Status::OK(); RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker().get()); // Read all the rows from the child and process them. bool eos = false; do { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state( "New partitioned aggregation, while getting next from child 0.")); RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); VLOG_ROW << "input row: " << row->to_string(_children[0]->row_desc()); } } SCOPED_TIMER(build_timer_); if (grouping_exprs_.empty()) { if (process_batch_no_grouping_fn_ != NULL) { RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch)); } else { RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch)); } } else { // There is grouping, so we will do partitioned aggregation. if (process_batch_fn_ != NULL) { RETURN_IF_ERROR(process_batch_fn_(this, &batch, ht_ctx_.get())); } else { RETURN_IF_ERROR(ProcessBatch(&batch, ht_ctx_.get())); } } batch.reset(); } while (!eos); // The child can be closed at this point in most cases because we have consumed all of // the input from the child and transfered ownership of the resources we need. The // exception is if we are inside a subplan expecting to call Open()/GetNext() on the // child again, if (!is_in_subplan()) child(0)->close(state); child_eos_ = true; // Done consuming child(0)'s input. Move all the partitions in hash_partitions_ // to spilled_partitions_ or aggregated_partitions_. We'll finish the processing in // GetNext(). if (!grouping_exprs_.empty()) { RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned())); } return Status::OK(); } Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { int first_row_idx = row_batch->num_rows(); RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx)); return Status::OK(); } Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, int first_row_idx) { if (!needs_finalize_ && !needs_serialize_) return Status::OK(); // String data returned by Serialize() or Finalize() is from local expr allocations in // the agg function contexts, and will be freed on the next GetNext() call by // FreeLocalAllocations(). The data either needs to be copied out now or sent up the // plan and copied out by a blocking ancestor. (See IMPALA-3311) for (const AggFn* agg_fn : agg_fns_) { const SlotDescriptor& slot_desc = agg_fn->output_slot_desc(); DCHECK(!slot_desc.type().is_collection_type()) << "producing collections NYI"; if (!slot_desc.type().is_var_len_string_type()) continue; if (is_in_subplan()) { // Copy string data to the row batch's pool. This is more efficient than // MarkNeedsDeepCopy() in a subplan since we are likely producing many small // batches. RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx, row_batch->tuple_data_pool())); } else { row_batch->mark_needs_deep_copy(); break; } } return Status::OK(); } Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch, int first_row_idx, MemPool* pool) { DCHECK(slot_desc.type().is_var_len_string_type()); DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); FOREACH_ROW(row_batch, first_row_idx, batch_iter) { Tuple* tuple = batch_iter.get()->get_tuple(0); StringValue* sv = reinterpret_cast(tuple->get_slot(slot_desc.tuple_offset())); if (sv == NULL || sv->len == 0) continue; char* new_ptr = reinterpret_cast(pool->try_allocate(sv->len)); if (UNLIKELY(new_ptr == NULL)) { string details = Substitute( "Cannot perform aggregation at node with id $0." " Failed to allocate $1 output bytes.", _id, sv->len); return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len); } memcpy(new_ptr, sv->ptr, sv->len); sv->ptr = new_ptr; } return Status::OK(); } Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("New partitioned aggregation, while getting next.")); // clear tmp expr result alocations expr_results_pool_->clear(); if (reached_limit()) { *eos = true; return Status::OK(); } if (grouping_exprs_.empty()) { // There was no grouping, so evaluate the conjuncts and return the single result row. // We allow calling GetNext() after eos, so don't return this row again. if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch); singleton_output_tuple_returned_ = true; *eos = true; return Status::OK(); } if (!child_eos_) { // For streaming preaggregations, we process rows from the child as we go. DCHECK(is_streaming_preagg_); RETURN_IF_ERROR(GetRowsStreaming(state, row_batch)); } else if (!partition_eos_) { RETURN_IF_ERROR(GetRowsFromPartition(state, row_batch)); } *eos = partition_eos_ && child_eos_; COUNTER_SET(_rows_returned_counter, _num_rows_returned); return Status::OK(); } void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { DCHECK(grouping_exprs_.empty()); int row_idx = row_batch->add_row(); TupleRow* row = row_batch->get_row(row_idx); Tuple* output_tuple = GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool()); row->set_tuple(0, output_tuple); if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) { row_batch->commit_last_row(); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); } // Keep the current chunk to amortize the memory allocation over a series // of Reset()/Open()/GetNext()* calls. row_batch->tuple_data_pool()->acquire_data(mem_pool_.get(), true); // This node no longer owns the memory for singleton_output_tuple_. singleton_output_tuple_ = NULL; } Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch) { DCHECK(!row_batch->at_capacity()); if (output_iterator_.AtEnd()) { // Done with this partition, move onto the next one. if (output_partition_ != NULL) { output_partition_->Close(false); output_partition_ = NULL; } if (aggregated_partitions_.empty() && spilled_partitions_.empty()) { // No more partitions, all done. partition_eos_ = true; return Status::OK(); } // Process next partition. RETURN_IF_ERROR(NextPartition()); DCHECK(output_partition_ != NULL); } SCOPED_TIMER(get_results_timer_); int count = 0; const int N = BitUtil::next_power_of_two(state->batch_size()); // Keeping returning rows from the current partition. while (!output_iterator_.AtEnd()) { // This loop can go on for a long time if the conjuncts are very selective. Do query // maintenance every N iterations. if ((count++ & (N - 1)) == 0) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state( "New partitioned aggregation, while getting rows from partition.")); } int row_idx = row_batch->add_row(); TupleRow* row = row_batch->get_row(row_idx); Tuple* intermediate_tuple = output_iterator_.GetTuple(); Tuple* output_tuple = GetOutputTuple(output_partition_->agg_fn_evals, intermediate_tuple, row_batch->tuple_data_pool()); output_iterator_.Next(); row->set_tuple(0, output_tuple); // TODO chenhao // DCHECK_EQ(_conjunct_ctxs.size(), _conjuncts.size()); if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), row)) { row_batch->commit_last_row(); ++_num_rows_returned; if (reached_limit() || row_batch->at_capacity()) { break; } } } COUNTER_SET(num_processed_rows_, num_hash_probe_->value()); COUNTER_SET(_rows_returned_counter, _num_rows_returned); partition_eos_ = reached_limit(); if (output_iterator_.AtEnd()) row_batch->mark_needs_deep_copy(); return Status::OK(); } Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, RowBatch* out_batch) { DCHECK(!child_eos_); DCHECK(is_streaming_preagg_); if (child_batch_ == NULL) { child_batch_.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker().get())); } do { DCHECK_EQ(out_batch->num_rows(), 0); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state( "New partitioned aggregation, while getting rows in streaming.")); RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_)); SCOPED_TIMER(streaming_timer_); int remaining_capacity[PARTITION_FANOUT]; bool ht_needs_expansion = false; for (int i = 0; i < PARTITION_FANOUT; ++i) { PartitionedHashTable* hash_tbl = GetHashTable(i); remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize(); ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows(); } // Stop expanding hash tables if we're not reducing the input sufficiently. As our // hash tables expand out of each level of cache hierarchy, every hash table lookup // will take longer. We also may not be able to expand hash tables because of memory // pressure. In this case HashTable::CheckAndResize() will fail. In either case we // should always use the remaining space in the hash table to avoid wasting memory. if (ht_needs_expansion && ShouldExpandPreaggHashTables()) { for (int i = 0; i < PARTITION_FANOUT; ++i) { PartitionedHashTable* ht = GetHashTable(i); if (remaining_capacity[i] < child_batch_->num_rows()) { SCOPED_TIMER(ht_resize_timer_); bool resized; RETURN_IF_ERROR( ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), &resized)); if (resized) { remaining_capacity[i] = ht->NumInsertsBeforeResize(); } } } } if (process_batch_streaming_fn_ != NULL) { RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_, child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity)); } else { RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity)); } child_batch_->reset(); // All rows from child_batch_ were processed. } while (out_batch->num_rows() == 0 && !child_eos_); if (child_eos_) { child(0)->close(state); child_batch_.reset(); RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned())); } _num_rows_returned += out_batch->num_rows(); COUNTER_SET(num_passthrough_rows_, _num_rows_returned); return Status::OK(); } bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const { int64_t ht_mem = 0; int64_t ht_rows = 0; for (int i = 0; i < PARTITION_FANOUT; ++i) { PartitionedHashTable* ht = hash_partitions_[i]->hash_tbl.get(); ht_mem += ht->CurrentMemSize(); ht_rows += ht->size(); } // Need some rows in tables to have valid statistics. if (ht_rows == 0) return true; // Find the appropriate reduction factor in our table for the current hash table sizes. int cache_level = 0; while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { ++cache_level; } // Compare the number of rows in the hash table with the number of input rows that // were aggregated into it. Exclude passed through rows from this calculation since // they were not in hash tables. const int64_t input_rows = _children[0]->rows_returned(); const int64_t aggregated_input_rows = input_rows - _num_rows_returned; // TODO chenhao // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; double current_reduction = static_cast(aggregated_input_rows) / ht_rows; // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be // inaccurate, which could lead to a divide by zero below. if (aggregated_input_rows <= 0) return true; // Extrapolate the current reduction factor (r) using the formula // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data // set, N is the number of input rows, excluding passed-through rows, and n is the // number of rows inserted or merged into the hash tables. This is a very rough // approximation but is good enough to be useful. // TODO: consider collecting more statistics to better estimate reduction. // double estimated_reduction = aggregated_input_rows >= expected_input_rows // ? current_reduction // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); double min_reduction = STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); // return estimated_reduction > min_reduction; return current_reduction > min_reduction; } void PartitionedAggregationNode::CleanupHashTbl(const vector& agg_fn_evals, PartitionedHashTable::Iterator it) { if (!needs_finalize_ && !needs_serialize_) return; // Iterate through the remaining rows in the hash table and call Serialize/Finalize on // them in order to free any memory allocated by UDAs. if (needs_finalize_) { // Finalize() requires a dst tuple but we don't actually need the result, // so allocate a single dummy tuple to avoid accumulating memory. Tuple* dummy_dst = NULL; dummy_dst = Tuple::create(output_tuple_desc_->byte_size(), mem_pool_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst); it.Next(); } } else { while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); NewAggFnEvaluator::Serialize(agg_fn_evals, tuple); it.Next(); } } } Status PartitionedAggregationNode::reset(RuntimeState* state) { DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation"; if (!grouping_exprs_.empty()) { child_eos_ = false; partition_eos_ = false; // Reset the HT and the partitions for this grouping agg. ht_ctx_->set_level(0); ClosePartitions(); } return ExecNode::reset(state); } Status PartitionedAggregationNode::close(RuntimeState* state) { if (is_closed()) return Status::OK(); if (!singleton_output_tuple_returned_) { GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get()); } // Iterate through the remaining rows in the hash table and call Serialize/Finalize on // them in order to free any memory allocated by UDAs if (output_partition_ != NULL) { CleanupHashTbl(output_partition_->agg_fn_evals, output_iterator_); output_partition_->Close(false); } ClosePartitions(); child_batch_.reset(); // Close all the agg-fn-evaluators NewAggFnEvaluator::Close(agg_fn_evals_, state); if (expr_results_pool_.get() != nullptr) { expr_results_pool_->free_all(); } if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->free_all(); if (mem_pool_.get() != nullptr) mem_pool_->free_all(); if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state); ht_ctx_.reset(); if (serialize_stream_.get() != nullptr) { serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } Expr::close(grouping_exprs_); Expr::close(build_exprs_); AggFn::Close(agg_fns_); return ExecNode::close(state); } PartitionedAggregationNode::Partition::~Partition() { DCHECK(is_closed); } Status PartitionedAggregationNode::Partition::InitStreams() { agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker().get())); DCHECK_EQ(agg_fn_evals.size(), 0); NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), parent->agg_fn_evals_, &agg_fn_evals); // Varlen aggregate function results are stored outside of aggregated_row_stream because // BufferedTupleStream3 doesn't support relocating varlen data stored in the stream. auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() + parent->grouping_exprs_.size(); std::set external_varlen_slots; for (; agg_slot != parent->intermediate_tuple_desc_->slots().end(); ++agg_slot) { if ((*agg_slot)->type().is_var_len_string_type()) { external_varlen_slots.insert((*agg_slot)->id()); } } aggregated_row_stream.reset(new BufferedTupleStream3( parent->state_, &parent->intermediate_row_desc_, &parent->_buffer_pool_client, parent->_resource_profile.spillable_buffer_size, parent->_resource_profile.max_row_buffer_size, external_varlen_slots)); RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true)); bool got_buffer; RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) << "Buffer included in reservation " << parent->_id << "\n" << parent->_buffer_pool_client.DebugString() << "\n" << parent->DebugString(2); if (!parent->is_streaming_preagg_) { unaggregated_row_stream.reset(new BufferedTupleStream3( parent->state_, &(parent->child(0)->row_desc()), &parent->_buffer_pool_client, parent->_resource_profile.spillable_buffer_size, parent->_resource_profile.max_row_buffer_size)); // This stream is only used to spill, no need to ever have this pinned. RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false)); // Save memory by waiting until we spill to allocate the write buffer for the // unaggregated row stream. DCHECK(!unaggregated_row_stream->has_write_iterator()); } return Status::OK(); } Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) { DCHECK(aggregated_row_stream != nullptr); DCHECK(hash_tbl == nullptr); // We use the upper PARTITION_FANOUT num bits to pick the partition so only the // remaining bits can be used for the hash table. // TODO: we could switch to 64 bit hashes and then we don't need a max size. // It might be reasonable to limit individual hash table size for other reasons // though. Always start with small buffers. hash_tbl.reset(PartitionedHashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr, 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); // Please update the error message in CreateHashPartitions() if initial size of // hash table changes. return hash_tbl->Init(got_memory); } Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { DCHECK(!parent->is_streaming_preagg_); if (parent->needs_serialize_) { // We need to do a lot more work in this case. This step effectively does a merge // aggregation in this node. We need to serialize the intermediates, spill the // intermediates and then feed them into the aggregate function's merge step. // This is often used when the intermediate is a string type, meaning the current // (before serialization) in-memory layout is not the on-disk block layout. // The disk layout does not support mutable rows. We need to rewrite the stream // into the on disk format. // TODO: if it happens to not be a string, we could serialize in place. This is // a future optimization since it is very unlikely to have a serialize phase // for those UDAs. DCHECK(parent->serialize_stream_.get() != NULL); DCHECK(!parent->serialize_stream_->is_pinned()); // Serialize and copy the spilled partition's stream into the new stream. Status status = Status::OK(); BufferedTupleStream3* new_stream = parent->serialize_stream_.get(); PartitionedHashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); it.Next(); NewAggFnEvaluator::Serialize(agg_fn_evals, tuple); if (UNLIKELY(!new_stream->AddRow(reinterpret_cast(&tuple), &status))) { DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error"; // Even if we can't add to new_stream, finish up processing this agg stream to make // clean up easier (someone has to finalize this stream and we don't want to remember // where we are). parent->CleanupHashTbl(agg_fn_evals, it); hash_tbl->Close(); hash_tbl.reset(); aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); return status; } } aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); aggregated_row_stream.swap(parent->serialize_stream_); // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for // when we need to spill again. We need to have this available before we need // to spill to make sure it is available. This should be acquirable since we just // freed at least one buffer from this partition's (old) aggregated_row_stream. parent->serialize_stream_.reset(new BufferedTupleStream3( parent->state_, &parent->intermediate_row_desc_, &parent->_buffer_pool_client, parent->_resource_profile.spillable_buffer_size, parent->_resource_profile.max_row_buffer_size)); status = parent->serialize_stream_->Init(parent->id(), false); if (status.ok()) { bool got_buffer; status = parent->serialize_stream_->PrepareForWrite(&got_buffer); DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation"; } if (!status.ok()) { hash_tbl->Close(); hash_tbl.reset(); return status; } DCHECK(parent->serialize_stream_->has_write_iterator()); } return Status::OK(); } Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { DCHECK(!parent->is_streaming_preagg_); DCHECK(!is_closed); DCHECK(!is_spilled()); // TODO(ml): enable spill std::stringstream msg; msg << "New partitioned Aggregation in spill"; LIMIT_EXCEEDED(parent->mem_tracker(), parent->state_, msg.str()); // RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker())); RETURN_IF_ERROR(SerializeStreamForSpilling()); // Free the in-memory result data. NewAggFnEvaluator::Close(agg_fn_evals, parent->state_); agg_fn_evals.clear(); if (agg_fn_pool.get() != NULL) { agg_fn_pool->free_all(); agg_fn_pool.reset(); } hash_tbl->Close(); hash_tbl.reset(); // Unpin the stream to free memory, but leave a write buffer in place so we can // continue appending rows to one of the streams in the partition. DCHECK(aggregated_row_stream->has_write_iterator()); DCHECK(!unaggregated_row_stream->has_write_iterator()); if (more_aggregate_rows) { // aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL_EXCEPT_CURRENT); } else { // aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); bool got_buffer; RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) << "Accounted in min reservation" << parent->_buffer_pool_client.DebugString(); } COUNTER_UPDATE(parent->num_spilled_partitions_, 1); if (parent->num_spilled_partitions_->value() == 1) { parent->add_runtime_exec_option("Spilled"); } return Status::OK(); } void PartitionedAggregationNode::Partition::Close(bool finalize_rows) { if (is_closed) return; is_closed = true; if (aggregated_row_stream.get() != NULL) { if (finalize_rows && hash_tbl.get() != NULL) { // We need to walk all the rows and Finalize them here so the UDA gets a chance // to cleanup. If the hash table is gone (meaning this was spilled), the rows // should have been finalized/serialized in Spill(). parent->CleanupHashTbl(agg_fn_evals, hash_tbl->Begin(parent->ht_ctx_.get())); } aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } if (hash_tbl.get() != NULL) hash_tbl->Close(); if (unaggregated_row_stream.get() != NULL) { unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } for (NewAggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_); if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all(); } Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple( const vector& agg_fn_evals, MemPool* pool) { DCHECK(grouping_exprs_.empty()); Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool); InitAggSlots(agg_fn_evals, output_tuple); return output_tuple; } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( const vector& agg_fn_evals, MemPool* pool, Status* status) { const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); const int tuple_data_size = fixed_size + varlen_size; uint8_t* tuple_data = pool->try_allocate(tuple_data_size); if (UNLIKELY(tuple_data == NULL)) { string details = Substitute( "Cannot perform aggregation at node with id $0. Failed " "to allocate $1 bytes for intermediate tuple.", _id, tuple_data_size); *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size); return NULL; } memset(tuple_data, 0, fixed_size); Tuple* intermediate_tuple = reinterpret_cast(tuple_data); uint8_t* varlen_data = tuple_data + fixed_size; CopyGroupingValues(intermediate_tuple, varlen_data, varlen_size); InitAggSlots(agg_fn_evals, intermediate_tuple); return intermediate_tuple; } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( const vector& agg_fn_evals, BufferedTupleStream3* stream, Status* status) { DCHECK(stream != NULL && status != NULL); // Allocate space for the entire tuple in the stream. const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); const int tuple_size = fixed_size + varlen_size; uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status); if (UNLIKELY(tuple_data == nullptr)) { // If we failed to allocate and did not hit an error (indicated by a non-ok status), // the caller of this function can try to free some space, e.g. through spilling, and // re-attempt to allocate space for this row. return nullptr; } Tuple* tuple = reinterpret_cast(tuple_data); tuple->init(fixed_size); uint8_t* varlen_buffer = tuple_data + fixed_size; CopyGroupingValues(tuple, varlen_buffer, varlen_size); InitAggSlots(agg_fn_evals, tuple); stream->AddRowCustomEnd(tuple_size); return tuple; } int PartitionedAggregationNode::GroupingExprsVarlenSize() { int varlen_size = 0; // TODO: The hash table could compute this as it hashes. for (int expr_idx : string_grouping_exprs_) { StringValue* sv = reinterpret_cast(ht_ctx_->ExprValue(expr_idx)); // Avoid branching by multiplying length by null bit. varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx); } return varlen_size; } // TODO: codegen this function. void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size) { // Copy over all grouping slots (the variable length data is copied below). for (int i = 0; i < grouping_exprs_.size(); ++i) { SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i]; if (ht_ctx_->ExprValueNull(i)) { intermediate_tuple->set_null(slot_desc->null_indicator_offset()); } else { void* src = ht_ctx_->ExprValue(i); void* dst = intermediate_tuple->get_slot(slot_desc->tuple_offset()); memcpy(dst, src, slot_desc->slot_size()); } } for (int expr_idx : string_grouping_exprs_) { if (ht_ctx_->ExprValueNull(expr_idx)) continue; SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx]; // ptr and len were already copied to the fixed-len part of string value StringValue* sv = reinterpret_cast( intermediate_tuple->get_slot(slot_desc->tuple_offset())); memcpy(buffer, sv->ptr, sv->len); sv->ptr = reinterpret_cast(buffer); buffer += sv->len; } } // TODO: codegen this function. void PartitionedAggregationNode::InitAggSlots(const vector& agg_fn_evals, Tuple* intermediate_tuple) { vector::const_iterator slot_desc = intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size(); for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) { // To minimize branching on the UpdateTuple path, initialize the result value so that // the Add() UDA function can ignore the NULL bit of its destination value. E.g. for // SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can // just start adding to the destination value (rather than repeatedly checking the // destination NULL bit. The codegen'd version of UpdateSlot() exploits this to // eliminate a branch per value. // // For boolean and numeric types, the default values are false/0, so the nullable // aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(), // initialize the value to max/min possible value for the same effect. NewAggFnEvaluator* eval = agg_fn_evals[i]; eval->Init(intermediate_tuple); DCHECK(agg_fns_[i] == &(eval->agg_fn())); const AggFn* agg_fn = agg_fns_[i]; const AggFn::AggregationOp agg_op = agg_fn->agg_op(); if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && !agg_fn->intermediate_type().is_string_type() && !agg_fn->intermediate_type().is_date_type()) { ExprValue default_value; void* default_value_ptr = NULL; if (agg_op == AggFn::MIN) { default_value_ptr = default_value.set_to_max((*slot_desc)->type()); } else { DCHECK_EQ(agg_op, AggFn::MAX); default_value_ptr = default_value.set_to_min((*slot_desc)->type()); } RawValue::write(default_value_ptr, intermediate_tuple, *slot_desc, NULL); } } } void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) { DCHECK(tuple != NULL || agg_fns_.empty()); for (int i = 0; i < agg_fns_.size(); ++i) { if (is_merge) { agg_fn_evals[i]->Merge(row->get_tuple(0), tuple); } else { agg_fn_evals[i]->Add(row, tuple); } } } Tuple* PartitionedAggregationNode::GetOutputTuple(const vector& agg_fn_evals, Tuple* tuple, MemPool* pool) { DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple; Tuple* dst = tuple; if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) { dst = Tuple::create(output_tuple_desc_->byte_size(), pool); } if (needs_finalize_) { NewAggFnEvaluator::Finalize(agg_fn_evals, tuple, dst); } else { NewAggFnEvaluator::Serialize(agg_fn_evals, tuple); } // Copy grouping values from tuple to dst. // TODO: Codegen this. if (dst != tuple) { int num_grouping_slots = grouping_exprs_.size(); for (int i = 0; i < num_grouping_slots; ++i) { SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i]; SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i]; bool src_slot_null = tuple->is_null(src_slot_desc->null_indicator_offset()); void* src_slot = NULL; if (!src_slot_null) src_slot = tuple->get_slot(src_slot_desc->tuple_offset()); RawValue::write(src_slot, dst, dst_slot_desc, NULL); } } return dst; } template Status PartitionedAggregationNode::AppendSpilledRow(Partition* partition, TupleRow* row) { DCHECK(!is_streaming_preagg_); DCHECK(partition->is_spilled()); BufferedTupleStream3* stream = AGGREGATED_ROWS ? partition->aggregated_row_stream.get() : partition->unaggregated_row_stream.get(); DCHECK(!stream->is_pinned()); Status status; if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); RETURN_IF_ERROR(status); // Keep trying to free memory by spilling until we succeed or hit an error. // Running out of partitions to spill is treated as an error by SpillPartition(). while (true) { RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); if (stream->AddRow(row, &status)) return Status::OK(); RETURN_IF_ERROR(status); } } string PartitionedAggregationNode::DebugString(int indentation_level) const { stringstream ss; DebugString(indentation_level, &ss); return ss.str(); } void PartitionedAggregationNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "PartitionedAggregationNode(" << "intermediate_tuple_id=" << intermediate_tuple_id_ << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_ << " grouping_exprs=" << Expr::debug_string(grouping_exprs_) << " agg_exprs=" << AggFn::DebugString(agg_fns_); ExecNode::debug_string(indentation_level, out); *out << ")"; } Status PartitionedAggregationNode::CreateHashPartitions(int level, int single_partition_idx) { if (is_streaming_preagg_) DCHECK_EQ(level, 0); if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) { stringstream error_msg; error_msg << "Cannot perform aggregation at hash aggregation node with id " << _id << '.' << " The input data was partitioned the maximum number of " << MAX_PARTITION_DEPTH << " times." << " This could mean there is significant skew in the data or the memory limit is" << " set too low."; return state_->set_mem_limit_exceeded(error_msg.str()); } ht_ctx_->set_level(level); DCHECK(hash_partitions_.empty()); int num_partitions_created = 0; for (int i = 0; i < PARTITION_FANOUT; ++i) { hash_tbls_[i] = nullptr; if (single_partition_idx == -1 || i == single_partition_idx) { Partition* new_partition = partition_pool_->add(new Partition(this, level, i)); ++num_partitions_created; hash_partitions_.push_back(new_partition); RETURN_IF_ERROR(new_partition->InitStreams()); } else { hash_partitions_.push_back(nullptr); } } // Now that all the streams are reserved (meaning we have enough memory to execute // the algorithm), allocate the hash tables. These can fail and we can still continue. for (int i = 0; i < PARTITION_FANOUT; ++i) { Partition* partition = hash_partitions_[i]; if (partition == nullptr) continue; if (partition->aggregated_row_stream == nullptr) { // Failed to create the aggregated row stream - cannot create a hash table. // Just continue with a NULL hash table so rows will be passed through. DCHECK(is_streaming_preagg_); } else { bool got_memory; RETURN_IF_ERROR(partition->InitHashTable(&got_memory)); // Spill the partition if we cannot create a hash table for a merge aggregation. if (UNLIKELY(!got_memory)) { DCHECK(!is_streaming_preagg_) << "Preagg reserves enough memory for hash tables"; // If we're repartitioning, we will be writing aggregated rows first. RETURN_IF_ERROR(partition->Spill(level > 0)); } } hash_tbls_[i] = partition->hash_tbl.get(); } // In this case we did not have to repartition, so ensure that while building the hash // table all rows will be inserted into the partition at 'single_partition_idx' in case // a non deterministic grouping expression causes a row to hash to a different // partition index. if (single_partition_idx != -1) { Partition* partition = hash_partitions_[single_partition_idx]; for (int i = 0; i < PARTITION_FANOUT; ++i) { hash_partitions_[i] = partition; hash_tbls_[i] = partition->hash_tbl.get(); } } COUNTER_UPDATE(partitions_created_, num_partitions_created); if (!is_streaming_preagg_) { COUNTER_SET(max_partition_level_, level); } return Status::OK(); } Status PartitionedAggregationNode::CheckAndResizeHashPartitions( bool partitioning_aggregated_rows, int num_rows, const PartitionedHashTableCtx* ht_ctx) { DCHECK(!is_streaming_preagg_); for (int i = 0; i < PARTITION_FANOUT; ++i) { Partition* partition = hash_partitions_[i]; if (partition == nullptr) continue; while (!partition->is_spilled()) { { SCOPED_TIMER(ht_resize_timer_); bool resized; RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized)); if (resized) break; } RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows)); } } return Status::OK(); } Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); if (!is_in_subplan() && spilled_partitions_.empty()) { // All partitions are in memory. Release reservation that was used for previous // partitions that is no longer needed. If we have spilled partitions, we want to // hold onto all reservation in case it is needed to process the spilled partitions. DCHECK(!_buffer_pool_client.has_unpinned_pages()); Status status = release_unused_reservation(); DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are " << "no unpinned pages. " << status.get_error_msg(); } // Keep looping until we get to a partition that fits in memory. Partition* partition = nullptr; while (true) { // First return partitions that are fully aggregated (and in memory). if (!aggregated_partitions_.empty()) { partition = aggregated_partitions_.front(); DCHECK(!partition->is_spilled()); aggregated_partitions_.pop_front(); break; } // No aggregated partitions in memory - we should not be using any reservation aside // from 'serialize_stream_'. DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString(); // Try to fit a single spilled partition in memory. We can often do this because // we only need to fit 1/PARTITION_FANOUT of the data in memory. // TODO: in some cases when the partition probably won't fit in memory it could // be better to skip directly to repartitioning. RETURN_IF_ERROR(BuildSpilledPartition(&partition)); if (partition != nullptr) break; // If we can't fit the partition in memory, repartition it. RETURN_IF_ERROR(RepartitionSpilledPartition()); } DCHECK(!partition->is_spilled()); DCHECK(partition->hash_tbl.get() != nullptr); DCHECK(partition->aggregated_row_stream->is_pinned()); output_partition_ = partition; output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get()); COUNTER_UPDATE(num_hash_buckets_, output_partition_->hash_tbl->num_buckets()); COUNTER_UPDATE(ht_resize_counter_, output_partition_->hash_tbl->num_resize()); COUNTER_UPDATE(num_hash_filled_buckets_, output_partition_->hash_tbl->num_filled_buckets()); COUNTER_UPDATE(num_hash_probe_, output_partition_->hash_tbl->num_probe()); COUNTER_UPDATE(num_hash_failed_probe_, output_partition_->hash_tbl->num_failed_probe()); COUNTER_UPDATE(num_hash_travel_length_, output_partition_->hash_tbl->travel_length()); COUNTER_UPDATE(num_hash_collisions_, output_partition_->hash_tbl->NumHashCollisions()); return Status::OK(); } Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) { DCHECK(!spilled_partitions_.empty()); DCHECK(!is_streaming_preagg_); // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. Partition* src_partition = spilled_partitions_.front(); DCHECK(src_partition->is_spilled()); // Create a new hash partition from the rows of the spilled partition. This is simpler // than trying to finish building a partially-built partition in place. We only // initialise one hash partition that all rows in 'src_partition' will hash to. RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx)); Partition* dst_partition = hash_partitions_[src_partition->idx]; DCHECK(dst_partition != nullptr); // Rebuild the hash table over spilled aggregate rows then start adding unaggregated // rows to the hash table. It's possible the partition will spill at either stage. // In that case we need to finish processing 'src_partition' so that all rows are // appended to 'dst_partition'. // TODO: if the partition spills again but the aggregation reduces the input // significantly, we could do better here by keeping the incomplete hash table in // memory and only spilling unaggregated rows that didn't fit in the hash table // (somewhat similar to the passthrough pre-aggregation). RETURN_IF_ERROR(ProcessStream(src_partition->aggregated_row_stream.get())); RETURN_IF_ERROR(ProcessStream(src_partition->unaggregated_row_stream.get())); src_partition->Close(false); spilled_partitions_.pop_front(); hash_partitions_.clear(); if (dst_partition->is_spilled()) { PushSpilledPartition(dst_partition); *built_partition = nullptr; // Spilled the partition - we should not be using any reservation except from // 'serialize_stream_'. DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, _buffer_pool_client.GetUsedReservation()) << _buffer_pool_client.DebugString(); } else { *built_partition = dst_partition; } return Status::OK(); } Status PartitionedAggregationNode::RepartitionSpilledPartition() { DCHECK(!spilled_partitions_.empty()); DCHECK(!is_streaming_preagg_); // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. Partition* partition = spilled_partitions_.front(); DCHECK(partition->is_spilled()); // Create the new hash partitions to repartition into. This will allocate a // write buffer for each partition's aggregated row stream. RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1)); COUNTER_UPDATE(num_repartitions_, 1); // Rows in this partition could have been spilled into two streams, depending // on if it is an aggregated intermediate, or an unaggregated row. Aggregated // rows are processed first to save a hash table lookup in ProcessBatch(). RETURN_IF_ERROR(ProcessStream(partition->aggregated_row_stream.get())); // Prepare write buffers so we can append spilled rows to unaggregated partitions. for (Partition* hash_partition : hash_partitions_) { if (!hash_partition->is_spilled()) continue; // The aggregated rows have been repartitioned. Free up at least a buffer's worth of // reservation and use it to pin the unaggregated write buffer. // hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); bool got_buffer; RETURN_IF_ERROR(hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) << "Accounted in min reservation" << _buffer_pool_client.DebugString(); } RETURN_IF_ERROR(ProcessStream(partition->unaggregated_row_stream.get())); COUNTER_UPDATE(num_row_repartitioned_, partition->aggregated_row_stream->num_rows()); COUNTER_UPDATE(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows()); partition->Close(false); spilled_partitions_.pop_front(); // Done processing this partition. Move the new partitions into // spilled_partitions_/aggregated_partitions_. int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + partition->unaggregated_row_stream->num_rows(); RETURN_IF_ERROR(MoveHashPartitions(num_input_rows)); return Status::OK(); } template Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) { DCHECK(!is_streaming_preagg_); if (input_stream->num_rows() > 0) { while (true) { bool got_buffer = false; RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer)); if (got_buffer) break; // Did not have a buffer to read the input stream. Spill and try again. RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); } bool eos = false; const RowDescriptor* desc = AGGREGATED_ROWS ? &intermediate_row_desc_ : &(_children[0]->row_desc()); RowBatch batch(*desc, state_->batch_size(), mem_tracker().get()); do { RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos)); RETURN_IF_ERROR(ProcessBatch(&batch, ht_ctx_.get())); RETURN_IF_ERROR(state_->check_query_state( "New partitioned aggregation, while processing stream.")); batch.reset(); } while (!eos); } input_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); return Status::OK(); } Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { int64_t max_freed_mem = 0; int partition_idx = -1; // Iterate over the partitions and pick the largest partition that is not spilled. for (int i = 0; i < hash_partitions_.size(); ++i) { if (hash_partitions_[i] == nullptr) continue; if (hash_partitions_[i]->is_closed) continue; if (hash_partitions_[i]->is_spilled()) continue; // Pass 'true' because we need to keep the write block pinned. See Partition::Spill(). int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true); mem += hash_partitions_[i]->hash_tbl->ByteSize(); mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes(); DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory. if (mem > max_freed_mem) { max_freed_mem = mem; partition_idx = i; } } DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to " << "reclaim memory: " << _buffer_pool_client.DebugString(); // Remove references to the destroyed hash table from 'hash_tbls_'. // Additionally, we might be dealing with a rebuilt spilled partition, where all // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_' // remains consistent in that case. for (int i = 0; i < PARTITION_FANOUT; ++i) { if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr; } return hash_partitions_[partition_idx]->Spill(more_aggregate_rows); } Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { DCHECK(!hash_partitions_.empty()); std::stringstream ss; ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level << ") " << num_input_rows << " rows into:" << std::endl; for (int i = 0; i < hash_partitions_.size(); ++i) { Partition* partition = hash_partitions_[i]; if (partition == nullptr) continue; // We might be dealing with a rebuilt spilled partition, where all partitions are // pointing to a single in-memory partition, so make sure we only proceed for the // right partition. if (i != partition->idx) continue; int64_t aggregated_rows = 0; if (partition->aggregated_row_stream != nullptr) { aggregated_rows = partition->aggregated_row_stream->num_rows(); } int64_t unaggregated_rows = 0; if (partition->unaggregated_row_stream != nullptr) { unaggregated_rows = partition->unaggregated_row_stream->num_rows(); } double total_rows = aggregated_rows + unaggregated_rows; double percent = total_rows * 100 / num_input_rows; ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled") << " (fraction=" << std::fixed << std::setprecision(2) << percent << "%)" << std::endl << " #aggregated rows:" << aggregated_rows << std::endl << " #unaggregated rows: " << unaggregated_rows << std::endl; // TODO: update counters to support doubles. COUNTER_SET(largest_partition_percent_, static_cast(percent)); if (total_rows == 0) { partition->Close(false); } else if (partition->is_spilled()) { PushSpilledPartition(partition); } else { aggregated_partitions_.push_back(partition); } } VLOG(2) << ss.str(); hash_partitions_.clear(); return Status::OK(); } void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) { DCHECK(partition->is_spilled()); DCHECK(partition->hash_tbl == nullptr); // Ensure all pages in the spilled partition's streams are unpinned by invalidating // the streams' read and write iterators. We may need all the memory to process the // next spilled partitions. // partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); // partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); spilled_partitions_.push_front(partition); } void PartitionedAggregationNode::ClosePartitions() { for (Partition* partition : hash_partitions_) { if (partition != nullptr) partition->Close(true); } hash_partitions_.clear(); for (Partition* partition : aggregated_partitions_) partition->Close(true); aggregated_partitions_.clear(); for (Partition* partition : spilled_partitions_) partition->Close(true); spilled_partitions_.clear(); memset(hash_tbls_, 0, sizeof(hash_tbls_)); partition_pool_->clear(); } //Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { // NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_); // for (Partition* partition : hash_partitions_) { // if (partition != nullptr) { // NewAggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals); // } // } // if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations(); // return ExecNode::QueryMaintenance(state); //} // Instantiate required templates. template Status PartitionedAggregationNode::AppendSpilledRow(Partition*, TupleRow*); template Status PartitionedAggregationNode::AppendSpilledRow(Partition*, TupleRow*); } // namespace doris