From 2dc995df7b09a046d224cf5e773612f703334e5d Mon Sep 17 00:00:00 2001 From: HappenLee Date: Fri, 20 Mar 2020 19:59:01 +0800 Subject: [PATCH] [CodeStyle] Rename new_partition_aggregation_node and new_partitioned_hash_table (#3166) --- be/src/common/config.h | 3 +- be/src/exec/CMakeLists.txt | 8 +- be/src/exec/exec_node.cpp | 34 +++--- ...ode.cc => partitioned_aggregation_node.cc} | 114 +++++++++--------- ..._node.h => partitioned_aggregation_node.h} | 52 ++++---- ....cc => partitioned_aggregation_node_ir.cc} | 60 +++++---- ...ash_table.cc => partitioned_hash_table.cc} | 92 +++++++------- ..._hash_table.h => partitioned_hash_table.h} | 44 +++---- ...line.h => partitioned_hash_table.inline.h} | 74 ++++++------ ...ble_ir.cc => partitioned_hash_table_ir.cc} | 8 +- be/src/runtime/row_batch.cpp | 12 +- 11 files changed, 249 insertions(+), 252 deletions(-) rename be/src/exec/{new_partitioned_aggregation_node.cc => partitioned_aggregation_node.cc} (93%) rename be/src/exec/{new_partitioned_aggregation_node.h => partitioned_aggregation_node.h} (95%) rename be/src/exec/{new_partitioned_aggregation_node_ir.cc => partitioned_aggregation_node_ir.cc} (82%) rename be/src/exec/{new_partitioned_hash_table.cc => partitioned_hash_table.cc} (86%) rename be/src/exec/{new_partitioned_hash_table.h => partitioned_hash_table.h} (96%) rename be/src/exec/{new_partitioned_hash_table.inline.h => partitioned_hash_table.inline.h} (82%) rename be/src/exec/{new_partitioned_hash_table_ir.cc => partitioned_hash_table_ir.cc} (76%) diff --git a/be/src/common/config.h b/be/src/common/config.h index d1faee8340..f722ff8582 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -337,8 +337,7 @@ namespace config { // for partition CONF_Bool(enable_partitioned_hash_join, "false") - CONF_Bool(enable_partitioned_aggregation, "false") - CONF_Bool(enable_new_partitioned_aggregation, "true") + CONF_Bool(enable_partitioned_aggregation, "true") // for kudu // "The maximum size of the row batch queue, for Kudu scanners." diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 223e6c5129..04575157d4 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -83,10 +83,10 @@ set(EXEC_FILES schema_scanner/schema_charsets_scanner.cpp schema_scanner/schema_collations_scanner.cpp schema_scanner/schema_helper.cpp - new_partitioned_hash_table.cc - new_partitioned_hash_table_ir.cc - new_partitioned_aggregation_node.cc - new_partitioned_aggregation_node_ir.cc + partitioned_hash_table.cc + partitioned_hash_table_ir.cc + partitioned_aggregation_node.cc + partitioned_aggregation_node_ir.cc local_file_writer.cpp broker_writer.cpp parquet_scanner.cpp diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 62d2438e27..7bdc3ff073 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -23,33 +23,33 @@ #include "common/object_pool.h" #include "common/status.h" -#include "exprs/expr_context.h" #include "exec/aggregation_node.h" -#include "exec/new_partitioned_aggregation_node.h" -#include "exec/csv_scan_node.h" -#include "exec/es_scan_node.h" -#include "exec/es_http_scan_node.h" -#include "exec/hash_join_node.h" +#include "exec/analytic_eval_node.h" +#include "exec/assert_num_rows_node.h" #include "exec/broker_scan_node.h" #include "exec/cross_join_node.h" +#include "exec/csv_scan_node.h" #include "exec/empty_set_node.h" -#include "exec/mysql_scan_node.h" -#include "exec/schema_scan_node.h" +#include "exec/es_http_scan_node.h" +#include "exec/es_scan_node.h" +#include "exec/except_node.h" #include "exec/exchange_node.h" +#include "exec/hash_join_node.h" +#include "exec/intersect_node.h" #include "exec/merge_join_node.h" #include "exec/merge_node.h" +#include "exec/mysql_scan_node.h" #include "exec/olap_rewrite_node.h" #include "exec/olap_scan_node.h" -#include "exec/topn_node.h" +#include "exec/partitioned_aggregation_node.h" +#include "exec/repeat_node.h" +#include "exec/schema_scan_node.h" +#include "exec/select_node.h" #include "exec/sort_node.h" #include "exec/spill_sort_node.h" -#include "exec/analytic_eval_node.h" -#include "exec/select_node.h" +#include "exec/topn_node.h" #include "exec/union_node.h" -#include "exec/intersect_node.h" -#include "exec/except_node.h" -#include "exec/repeat_node.h" -#include "exec/assert_num_rows_node.h" +#include "exprs/expr_context.h" #include "runtime/exec_env.h" #include "runtime/descriptors.h" #include "runtime/initial_reservations.h" @@ -380,8 +380,8 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::AGGREGATION_NODE: - if (config::enable_new_partitioned_aggregation) { - *node = pool->add(new NewPartitionedAggregationNode(pool, tnode, descs)); + if (config::enable_partitioned_aggregation) { + *node = pool->add(new PartitionedAggregationNode(pool, tnode, descs)); } else { *node = pool->add(new AggregationNode(pool, tnode, descs)); } diff --git a/be/src/exec/new_partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc similarity index 93% rename from be/src/exec/new_partitioned_aggregation_node.cc rename to be/src/exec/partitioned_aggregation_node.cc index 24688031b3..3c5a5737b0 100644 --- a/be/src/exec/new_partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include "exec/new_partitioned_aggregation_node.h" +#include "exec/partitioned_aggregation_node.h" #include #include #include #include -#include "exec/new_partitioned_hash_table.h" -#include "exec/new_partitioned_hash_table.inline.h" +#include "exec/partitioned_hash_table.h" +#include "exec/partitioned_hash_table.inline.h" #include "exprs/new_agg_fn_evaluator.h" #include "exprs/anyval_util.h" #include "exprs/expr_context.h" @@ -93,7 +93,7 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { static const int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); -NewPartitionedAggregationNode::NewPartitionedAggregationNode( +PartitionedAggregationNode::PartitionedAggregationNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id), @@ -142,7 +142,7 @@ NewPartitionedAggregationNode::NewPartitionedAggregationNode( } } -Status NewPartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { +Status PartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode)); DCHECK(intermediate_tuple_desc_ != nullptr); DCHECK(output_tuple_desc_ != nullptr); @@ -177,7 +177,7 @@ Status NewPartitionedAggregationNode::init(const TPlanNode& tnode, RuntimeState* return Status::OK(); } -Status NewPartitionedAggregationNode::prepare(RuntimeState* state) { +Status PartitionedAggregationNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); @@ -222,7 +222,7 @@ Status NewPartitionedAggregationNode::prepare(RuntimeState* state) { expr_results_pool_.reset(new MemPool(_expr_mem_tracker.get())); if (!grouping_exprs_.empty()) { RowDescriptor build_row_desc(intermediate_tuple_desc_, false); - RETURN_IF_ERROR(NewPartitionedHashTableCtx::Create(_pool, state, build_exprs_, + RETURN_IF_ERROR(PartitionedHashTableCtx::Create(_pool, state, build_exprs_, grouping_exprs_, true, vector(build_exprs_.size(), true), state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), expr_results_pool_.get(), expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_)); @@ -231,7 +231,7 @@ Status NewPartitionedAggregationNode::prepare(RuntimeState* state) { return Status::OK(); } -Status NewPartitionedAggregationNode::open(RuntimeState* state) { +Status PartitionedAggregationNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); // Open the child before consuming resources in this node. RETURN_IF_ERROR(child(0)->open(state)); @@ -329,7 +329,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) { return Status::OK(); } -Status NewPartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, +Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { int first_row_idx = row_batch->num_rows(); RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); @@ -337,7 +337,7 @@ Status NewPartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* ro return Status::OK(); } -Status NewPartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, +Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, int first_row_idx) { if (!needs_finalize_ && !needs_serialize_) return Status::OK(); // String data returned by Serialize() or Finalize() is from local expr allocations in @@ -362,7 +362,7 @@ Status NewPartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, return Status::OK(); } -Status NewPartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc, +Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch, int first_row_idx, MemPool* pool) { DCHECK(slot_desc.type().is_var_len_string_type()); DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); @@ -383,7 +383,7 @@ Status NewPartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_ return Status::OK(); } -Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state, +Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); @@ -419,7 +419,7 @@ Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state, return Status::OK(); } -void NewPartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { +void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { DCHECK(grouping_exprs_.empty()); int row_idx = row_batch->add_row(); TupleRow* row = row_batch->get_row(row_idx); @@ -439,7 +439,7 @@ void NewPartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { singleton_output_tuple_ = NULL; } -Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, +Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch) { DCHECK(!row_batch->at_capacity()); if (output_iterator_.AtEnd()) { @@ -496,7 +496,7 @@ Status NewPartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state, return Status::OK(); } -Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, +Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, RowBatch* out_batch) { DCHECK(!child_eos_); DCHECK(is_streaming_preagg_); @@ -518,7 +518,7 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, int remaining_capacity[PARTITION_FANOUT]; bool ht_needs_expansion = false; for (int i = 0; i < PARTITION_FANOUT; ++i) { - NewPartitionedHashTable* hash_tbl = GetHashTable(i); + PartitionedHashTable* hash_tbl = GetHashTable(i); remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize(); ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows(); } @@ -530,7 +530,7 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, // should always use the remaining space in the hash table to avoid wasting memory. if (ht_needs_expansion && ShouldExpandPreaggHashTables()) { for (int i = 0; i < PARTITION_FANOUT; ++i) { - NewPartitionedHashTable* ht = GetHashTable(i); + PartitionedHashTable* ht = GetHashTable(i); if (remaining_capacity[i] < child_batch_->num_rows()) { SCOPED_TIMER(ht_resize_timer_); bool resized; @@ -565,11 +565,11 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, return Status::OK(); } -bool NewPartitionedAggregationNode::ShouldExpandPreaggHashTables() const { +bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const { int64_t ht_mem = 0; int64_t ht_rows = 0; for (int i = 0; i < PARTITION_FANOUT; ++i) { - NewPartitionedHashTable* ht = hash_partitions_[i]->hash_tbl.get(); + PartitionedHashTable* ht = hash_partitions_[i]->hash_tbl.get(); ht_mem += ht->CurrentMemSize(); ht_rows += ht->size(); } @@ -615,8 +615,8 @@ bool NewPartitionedAggregationNode::ShouldExpandPreaggHashTables() const { return current_reduction > min_reduction; } -void NewPartitionedAggregationNode::CleanupHashTbl( - const vector& agg_fn_evals, NewPartitionedHashTable::Iterator it) { +void PartitionedAggregationNode::CleanupHashTbl( + const vector& agg_fn_evals, PartitionedHashTable::Iterator it) { if (!needs_finalize_ && !needs_serialize_) return; // Iterate through the remaining rows in the hash table and call Serialize/Finalize on @@ -640,7 +640,7 @@ void NewPartitionedAggregationNode::CleanupHashTbl( } } -Status NewPartitionedAggregationNode::reset(RuntimeState* state) { +Status PartitionedAggregationNode::reset(RuntimeState* state) { DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation"; if (!grouping_exprs_.empty()) { child_eos_ = false; @@ -652,7 +652,7 @@ Status NewPartitionedAggregationNode::reset(RuntimeState* state) { return ExecNode::reset(state); } -Status NewPartitionedAggregationNode::close(RuntimeState* state) { +Status PartitionedAggregationNode::close(RuntimeState* state) { if (is_closed()) return Status::OK(); if (!singleton_output_tuple_returned_) { @@ -688,11 +688,11 @@ Status NewPartitionedAggregationNode::close(RuntimeState* state) { return ExecNode::close(state); } -NewPartitionedAggregationNode::Partition::~Partition() { +PartitionedAggregationNode::Partition::~Partition() { DCHECK(is_closed); } -Status NewPartitionedAggregationNode::Partition::InitStreams() { +Status PartitionedAggregationNode::Partition::InitStreams() { agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker())); DCHECK_EQ(agg_fn_evals.size(), 0); NewAggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(), @@ -735,7 +735,7 @@ Status NewPartitionedAggregationNode::Partition::InitStreams() { return Status::OK(); } -Status NewPartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) { +Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) { DCHECK(aggregated_row_stream != nullptr); DCHECK(hash_tbl == nullptr); // We use the upper PARTITION_FANOUT num bits to pick the partition so only the @@ -743,14 +743,14 @@ Status NewPartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) // TODO: we could switch to 64 bit hashes and then we don't need a max size. // It might be reasonable to limit individual hash table size for other reasons // though. Always start with small buffers. - hash_tbl.reset(NewPartitionedHashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr, + hash_tbl.reset(PartitionedHashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr, 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); // Please update the error message in CreateHashPartitions() if initial size of // hash table changes. return hash_tbl->Init(got_memory); } -Status NewPartitionedAggregationNode::Partition::SerializeStreamForSpilling() { +Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { DCHECK(!parent->is_streaming_preagg_); if (parent->needs_serialize_) { // We need to do a lot more work in this case. This step effectively does a merge @@ -769,7 +769,7 @@ Status NewPartitionedAggregationNode::Partition::SerializeStreamForSpilling() { // Serialize and copy the spilled partition's stream into the new stream. Status status = Status::OK(); BufferedTupleStream3* new_stream = parent->serialize_stream_.get(); - NewPartitionedHashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); + PartitionedHashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); it.Next(); @@ -813,7 +813,7 @@ Status NewPartitionedAggregationNode::Partition::SerializeStreamForSpilling() { return Status::OK(); } -Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { +Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { DCHECK(!parent->is_streaming_preagg_); DCHECK(!is_closed); DCHECK(!is_spilled()); @@ -858,7 +858,7 @@ Status NewPartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) return Status::OK(); } -void NewPartitionedAggregationNode::Partition::Close(bool finalize_rows) { +void PartitionedAggregationNode::Partition::Close(bool finalize_rows) { if (is_closed) return; is_closed = true; if (aggregated_row_stream.get() != NULL) { @@ -879,7 +879,7 @@ void NewPartitionedAggregationNode::Partition::Close(bool finalize_rows) { if (agg_fn_pool.get() != NULL) agg_fn_pool->free_all(); } -Tuple* NewPartitionedAggregationNode::ConstructSingletonOutputTuple( +Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple( const vector& agg_fn_evals, MemPool* pool) { DCHECK(grouping_exprs_.empty()); Tuple* output_tuple = Tuple::create(intermediate_tuple_desc_->byte_size(), pool); @@ -887,7 +887,7 @@ Tuple* NewPartitionedAggregationNode::ConstructSingletonOutputTuple( return output_tuple; } -Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple( +Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( const vector& agg_fn_evals, MemPool* pool, Status* status) { const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); @@ -907,7 +907,7 @@ Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple( return intermediate_tuple; } -Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple( +Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( const vector& agg_fn_evals, BufferedTupleStream3* stream, Status* status) { DCHECK(stream != NULL && status != NULL); @@ -931,7 +931,7 @@ Tuple* NewPartitionedAggregationNode::ConstructIntermediateTuple( return tuple; } -int NewPartitionedAggregationNode::GroupingExprsVarlenSize() { +int PartitionedAggregationNode::GroupingExprsVarlenSize() { int varlen_size = 0; // TODO: The hash table could compute this as it hashes. for (int expr_idx: string_grouping_exprs_) { @@ -943,7 +943,7 @@ int NewPartitionedAggregationNode::GroupingExprsVarlenSize() { } // TODO: codegen this function. -void NewPartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, +void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size) { // Copy over all grouping slots (the variable length data is copied below). for (int i = 0; i < grouping_exprs_.size(); ++i) { @@ -971,7 +971,7 @@ void NewPartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple } // TODO: codegen this function. -void NewPartitionedAggregationNode::InitAggSlots( +void PartitionedAggregationNode::InitAggSlots( const vector& agg_fn_evals, Tuple* intermediate_tuple) { vector::const_iterator slot_desc = intermediate_tuple_desc_->slots().begin() + grouping_exprs_.size(); @@ -1008,7 +1008,7 @@ void NewPartitionedAggregationNode::InitAggSlots( } } -void NewPartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals, +void PartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) { DCHECK(tuple != NULL || agg_fns_.empty()); for (int i = 0; i < agg_fns_.size(); ++i) { @@ -1020,7 +1020,7 @@ void NewPartitionedAggregationNode::UpdateTuple(NewAggFnEvaluator** agg_fn_evals } } -Tuple* NewPartitionedAggregationNode::GetOutputTuple( +Tuple* PartitionedAggregationNode::GetOutputTuple( const vector& agg_fn_evals, Tuple* tuple, MemPool* pool) { DCHECK(tuple != NULL || agg_fn_evals.empty()) << tuple; Tuple* dst = tuple; @@ -1049,7 +1049,7 @@ Tuple* NewPartitionedAggregationNode::GetOutputTuple( } template -Status NewPartitionedAggregationNode::AppendSpilledRow( +Status PartitionedAggregationNode::AppendSpilledRow( Partition* partition, TupleRow* row) { DCHECK(!is_streaming_preagg_); DCHECK(partition->is_spilled()); @@ -1070,16 +1070,16 @@ Status NewPartitionedAggregationNode::AppendSpilledRow( } } -string NewPartitionedAggregationNode::DebugString(int indentation_level) const { +string PartitionedAggregationNode::DebugString(int indentation_level) const { stringstream ss; DebugString(indentation_level, &ss); return ss.str(); } -void NewPartitionedAggregationNode::DebugString(int indentation_level, +void PartitionedAggregationNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); - *out << "NewPartitionedAggregationNode(" + *out << "PartitionedAggregationNode(" << "intermediate_tuple_id=" << intermediate_tuple_id_ << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_ @@ -1089,7 +1089,7 @@ void NewPartitionedAggregationNode::DebugString(int indentation_level, *out << ")"; } -Status NewPartitionedAggregationNode::CreateHashPartitions( +Status PartitionedAggregationNode::CreateHashPartitions( int level, int single_partition_idx) { if (is_streaming_preagg_) DCHECK_EQ(level, 0); if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) { @@ -1158,8 +1158,8 @@ Status NewPartitionedAggregationNode::CreateHashPartitions( return Status::OK(); } -Status NewPartitionedAggregationNode::CheckAndResizeHashPartitions( - bool partitioning_aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx) { +Status PartitionedAggregationNode::CheckAndResizeHashPartitions( + bool partitioning_aggregated_rows, int num_rows, const PartitionedHashTableCtx* ht_ctx) { DCHECK(!is_streaming_preagg_); for (int i = 0; i < PARTITION_FANOUT; ++i) { Partition* partition = hash_partitions_[i]; @@ -1177,7 +1177,7 @@ Status NewPartitionedAggregationNode::CheckAndResizeHashPartitions( return Status::OK(); } -Status NewPartitionedAggregationNode::NextPartition() { +Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); if (!is_in_subplan() && spilled_partitions_.empty()) { @@ -1226,7 +1226,7 @@ Status NewPartitionedAggregationNode::NextPartition() { return Status::OK(); } -Status NewPartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) { +Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) { DCHECK(!spilled_partitions_.empty()); DCHECK(!is_streaming_preagg_); // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. @@ -1267,7 +1267,7 @@ Status NewPartitionedAggregationNode::BuildSpilledPartition(Partition** built_pa return Status::OK(); } -Status NewPartitionedAggregationNode::RepartitionSpilledPartition() { +Status PartitionedAggregationNode::RepartitionSpilledPartition() { DCHECK(!spilled_partitions_.empty()); DCHECK(!is_streaming_preagg_); // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. @@ -1313,7 +1313,7 @@ Status NewPartitionedAggregationNode::RepartitionSpilledPartition() { } template -Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) { +Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) { DCHECK(!is_streaming_preagg_); if (input_stream->num_rows() > 0) { while (true) { @@ -1340,7 +1340,7 @@ Status NewPartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_ return Status::OK(); } -Status NewPartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { +Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { int64_t max_freed_mem = 0; int partition_idx = -1; @@ -1371,7 +1371,7 @@ Status NewPartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { return hash_partitions_[partition_idx]->Spill(more_aggregate_rows); } -Status NewPartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { +Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { DCHECK(!hash_partitions_.empty()); std::stringstream ss; ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level @@ -1415,7 +1415,7 @@ Status NewPartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) return Status::OK(); } -void NewPartitionedAggregationNode::PushSpilledPartition(Partition* partition) { +void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) { DCHECK(partition->is_spilled()); DCHECK(partition->hash_tbl == nullptr); // Ensure all pages in the spilled partition's streams are unpinned by invalidating @@ -1426,7 +1426,7 @@ void NewPartitionedAggregationNode::PushSpilledPartition(Partition* partition) { spilled_partitions_.push_front(partition); } -void NewPartitionedAggregationNode::ClosePartitions() { +void PartitionedAggregationNode::ClosePartitions() { for (Partition* partition : hash_partitions_) { if (partition != nullptr) partition->Close(true); } @@ -1439,7 +1439,7 @@ void NewPartitionedAggregationNode::ClosePartitions() { partition_pool_->clear(); } -//Status NewPartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { +//Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { // NewAggFnEvaluator::FreeLocalAllocations(agg_fn_evals_); // for (Partition* partition : hash_partitions_) { // if (partition != nullptr) { @@ -1451,9 +1451,9 @@ void NewPartitionedAggregationNode::ClosePartitions() { //} // Instantiate required templates. -template Status NewPartitionedAggregationNode::AppendSpilledRow( +template Status PartitionedAggregationNode::AppendSpilledRow( Partition*, TupleRow*); -template Status NewPartitionedAggregationNode::AppendSpilledRow(Partition*, TupleRow*); +template Status PartitionedAggregationNode::AppendSpilledRow(Partition*, TupleRow*); } diff --git a/be/src/exec/new_partitioned_aggregation_node.h b/be/src/exec/partitioned_aggregation_node.h similarity index 95% rename from be/src/exec/new_partitioned_aggregation_node.h rename to be/src/exec/partitioned_aggregation_node.h index 62a3da441e..ea8bab5e29 100644 --- a/be/src/exec/new_partitioned_aggregation_node.h +++ b/be/src/exec/partitioned_aggregation_node.h @@ -23,7 +23,7 @@ #include #include "exec/exec_node.h" -#include "exec/new_partitioned_hash_table.h" +#include "exec/partitioned_hash_table.h" #include "runtime/buffered_tuple_stream3.h" #include "runtime/bufferpool/suballocator.h" #include "runtime/descriptors.h" // for TupleId @@ -117,10 +117,10 @@ class SlotDescriptor; /// There are so many contexts in use that a plain "ctx" variable should never be used. /// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this. /// TODO: support an Init() method with an initial value in the UDAF interface. -class NewPartitionedAggregationNode : public ExecNode { +class PartitionedAggregationNode : public ExecNode { public: - NewPartitionedAggregationNode(ObjectPool* pool, + PartitionedAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); virtual Status init(const TPlanNode& tnode, RuntimeState* state); @@ -154,7 +154,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). /// In the case where there is skew, repartitioning is unlikely to help (assuming a /// reasonable hash function). - /// Note that we need to have at least as many SEED_PRIMES in NewPartitionedHashTableCtx. + /// Note that we need to have at least as many SEED_PRIMES in PartitionedHashTableCtx. /// TODO: we can revisit and try harder to explicitly detect skew. static const int MAX_PARTITION_DEPTH = 16; @@ -243,19 +243,19 @@ class NewPartitionedAggregationNode : public ExecNode { /// The current partition and iterator to the next row in its hash table that we need /// to return in GetNext() Partition* output_partition_; - NewPartitionedHashTable::Iterator output_iterator_; + PartitionedHashTable::Iterator output_iterator_; - typedef Status (*ProcessBatchNoGroupingFn)(NewPartitionedAggregationNode*, RowBatch*); + typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*); /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled. ProcessBatchNoGroupingFn process_batch_no_grouping_fn_; typedef Status (*ProcessBatchFn)( - NewPartitionedAggregationNode*, RowBatch*, NewPartitionedHashTableCtx*); + PartitionedAggregationNode*, RowBatch*, PartitionedHashTableCtx*); /// Jitted ProcessBatch function pointer. Null if codegen is disabled. ProcessBatchFn process_batch_fn_; - typedef Status (*ProcessBatchStreamingFn)(NewPartitionedAggregationNode*, bool, - RowBatch*, RowBatch*, NewPartitionedHashTableCtx*, int[PARTITION_FANOUT]); + typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool, + RowBatch*, RowBatch*, PartitionedHashTableCtx*, int[PARTITION_FANOUT]); /// Jitted ProcessBatchStreaming function pointer. Null if codegen is disabled. ProcessBatchStreamingFn process_batch_streaming_fn_; @@ -327,7 +327,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// Used for hash-related functionality, such as evaluating rows and calculating hashes. /// It also owns the evaluators for the grouping and build expressions used during hash /// table insertion and probing. - boost::scoped_ptr ht_ctx_; + boost::scoped_ptr ht_ctx_; /// Object pool that holds the Partition objects in hash_partitions_. boost::scoped_ptr partition_pool_; @@ -340,7 +340,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we /// rebuild a spilled partition that fits in memory, all pointers in this array will /// point to the hash table that is a part of a single in-memory partition. - NewPartitionedHashTable* hash_tbls_[PARTITION_FANOUT]; + PartitionedHashTable* hash_tbls_[PARTITION_FANOUT]; /// All partitions that have been spilled and need further processing. std::deque spilled_partitions_; @@ -358,7 +358,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// initially use small buffers. Streaming pre-aggregations do not spill and do not /// require an unaggregated stream. struct Partition { - Partition(NewPartitionedAggregationNode* parent, int level, int idx) + Partition(PartitionedAggregationNode* parent, int level, int idx) : parent(parent), is_closed(false), level(level), idx(idx) {} ~Partition(); @@ -392,7 +392,7 @@ class NewPartitionedAggregationNode : public ExecNode { bool is_spilled() const { return hash_tbl.get() == NULL; } - NewPartitionedAggregationNode* parent; + PartitionedAggregationNode* parent; /// If true, this partition is closed and there is nothing left to do. bool is_closed; @@ -408,7 +408,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// Hash table for this partition. /// Can be NULL if this partition is no longer maintaining a hash table (i.e. /// is spilled or we are passing through all rows for this partition). - boost::scoped_ptr hash_tbl; + boost::scoped_ptr hash_tbl; /// Clone of parent's agg_fn_evals_. Permanent allocations come from /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's @@ -438,8 +438,8 @@ class NewPartitionedAggregationNode : public ExecNode { boost::scoped_ptr serialize_stream_; /// Accessor for 'hash_tbls_' that verifies consistency with the partitions. - NewPartitionedHashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { - NewPartitionedHashTable* ht = hash_tbls_[partition_idx]; + PartitionedHashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { + PartitionedHashTable* ht = hash_tbls_[partition_idx]; DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get()); return ht; } @@ -530,7 +530,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for /// performance. template - Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, NewPartitionedHashTableCtx* ht_ctx); + Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, PartitionedHashTableCtx* ht_ctx); /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on @@ -538,13 +538,13 @@ class NewPartitionedAggregationNode : public ExecNode { /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant. template - void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, NewPartitionedHashTableCtx* ht_ctx); + void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx, PartitionedHashTableCtx* ht_ctx); /// This function processes each individual row in ProcessBatch(). Must be inlined into /// ProcessBatch for codegen to substitute function calls with codegen'd versions. /// May spill partitions if not enough memory is available. template - Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, NewPartitionedHashTableCtx* ht_ctx); + Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, PartitionedHashTableCtx* ht_ctx); /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is /// the context for the partition's hash table and hash is the precomputed hash of @@ -552,10 +552,10 @@ class NewPartitionedAggregationNode : public ExecNode { /// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate /// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen /// to substitute function calls with codegen'd versions. insert_it is an iterator - /// for insertion returned from NewPartitionedHashTable::FindBuildRowBucket(). + /// for insertion returned from PartitionedHashTable::FindBuildRowBucket(). template Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, - TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it); + TupleRow* row, uint32_t hash, PartitionedHashTable::Iterator insert_it); /// Append a row to a spilled partition. May spill partitions if needed to switch to /// I/O buffers. Selects the correct stream according to the argument. Inlined into @@ -597,7 +597,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// by ProcessBatchStreaming() when it inserts new rows. /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser. Status ProcessBatchStreaming(bool needs_serialize, - RowBatch* in_batch, RowBatch* out_batch, NewPartitionedHashTableCtx* ht_ctx, + RowBatch* in_batch, RowBatch* out_batch, PartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]); /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming @@ -608,8 +608,8 @@ class NewPartitionedAggregationNode : public ExecNode { /// keeps track of how many more entries can be added to the hash table so we can avoid /// retrying inserts. It is decremented if an insert succeeds and set to zero if an /// insert fails. If an error occurs, returns false and sets 'status'. - bool IR_ALWAYS_INLINE TryAddToHashTable(NewPartitionedHashTableCtx* ht_ctx, - Partition* partition, NewPartitionedHashTable* hash_tbl, TupleRow* in_row, uint32_t hash, + bool IR_ALWAYS_INLINE TryAddToHashTable(PartitionedHashTableCtx* ht_ctx, + Partition* partition, PartitionedHashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity, Status* status); /// Initializes hash_partitions_. 'level' is the level for the partitions to create. @@ -622,7 +622,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// 'num_rows' additional hash table entries. If there is not enough memory to /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if /// we're currently partitioning aggregated rows. - Status CheckAndResizeHashPartitions(bool aggregated_rows, int num_rows, const NewPartitionedHashTableCtx* ht_ctx); + Status CheckAndResizeHashPartitions(bool aggregated_rows, int num_rows, const PartitionedHashTableCtx* ht_ctx); /// Prepares the next partition to return results from. On return, this function /// initializes output_iterator_ and output_partition_. This either removes @@ -669,7 +669,7 @@ class NewPartitionedAggregationNode : public ExecNode { /// Calls finalizes on all tuples starting at 'it'. void CleanupHashTbl(const std::vector& agg_fn_evals, - NewPartitionedHashTable::Iterator it); + PartitionedHashTable::Iterator it); /// Compute minimum buffer reservation for grouping aggregations. /// We need one buffer per partition, which is used either as the write buffer for the diff --git a/be/src/exec/new_partitioned_aggregation_node_ir.cc b/be/src/exec/partitioned_aggregation_node_ir.cc similarity index 82% rename from be/src/exec/new_partitioned_aggregation_node_ir.cc rename to be/src/exec/partitioned_aggregation_node_ir.cc index 6674bbdd2f..b706bfe69e 100644 --- a/be/src/exec/new_partitioned_aggregation_node_ir.cc +++ b/be/src/exec/partitioned_aggregation_node_ir.cc @@ -14,12 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - -#include "exec/new_partitioned_aggregation_node.h" - -#include "exec/new_partitioned_hash_table.inline.h" -#include "exprs/new_agg_fn_evaluator.h" +#include "exec/partitioned_aggregation_node.h" +#include "exec/partitioned_hash_table.inline.h" #include "exprs/expr_context.h" +#include "exprs/new_agg_fn_evaluator.h" #include "runtime/buffered_tuple_stream3.inline.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" @@ -27,7 +25,7 @@ using namespace doris; -Status NewPartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { +Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { Tuple* output_tuple = singleton_output_tuple_; FOREACH_ROW(batch, 0, batch_iter) { UpdateTuple(agg_fn_evals_.data(), output_tuple, batch_iter.get()); @@ -36,8 +34,8 @@ Status NewPartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) { } template -Status NewPartitionedAggregationNode::ProcessBatch(RowBatch* batch, - NewPartitionedHashTableCtx* ht_ctx) { +Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch, + PartitionedHashTableCtx* ht_ctx) { DCHECK(!hash_partitions_.empty()); DCHECK(!is_streaming_preagg_); @@ -48,7 +46,7 @@ Status NewPartitionedAggregationNode::ProcessBatch(RowBatch* batch, // accurate resize calls. RETURN_IF_ERROR(CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx)); - NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + PartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int cache_size = expr_vals_cache->capacity(); const int num_rows = batch->num_rows(); for (int group_start = 0; group_start < num_rows; group_start += cache_size) { @@ -64,10 +62,10 @@ Status NewPartitionedAggregationNode::ProcessBatch(RowBatch* batch, } template -void IR_ALWAYS_INLINE NewPartitionedAggregationNode::EvalAndHashPrefetchGroup( +void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup( RowBatch* batch, int start_row_idx, - NewPartitionedHashTableCtx* ht_ctx) { - NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + PartitionedHashTableCtx* ht_ctx) { + PartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int cache_size = expr_vals_cache->capacity(); expr_vals_cache->Reset(); @@ -82,7 +80,7 @@ void IR_ALWAYS_INLINE NewPartitionedAggregationNode::EvalAndHashPrefetchGroup( // Hoist lookups out of non-null branch to speed up non-null case. const uint32_t hash = expr_vals_cache->CurExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx); + PartitionedHashTable* hash_tbl = GetHashTable(partition_idx); if (is_null) { expr_vals_cache->SetRowNull(); } else if (config::enable_prefetch) { @@ -95,9 +93,9 @@ void IR_ALWAYS_INLINE NewPartitionedAggregationNode::EvalAndHashPrefetchGroup( } template -Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row, - NewPartitionedHashTableCtx* ht_ctx) { - NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); +Status PartitionedAggregationNode::ProcessRow(TupleRow* row, + PartitionedHashTableCtx* ht_ctx) { + PartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); // Hoist lookups out of non-null branch to speed up non-null case. const uint32_t hash = expr_vals_cache->CurExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); @@ -106,7 +104,7 @@ Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row, // partition's hash table. If we need to insert it and that fails, due to OOM, we // spill the partition. The partition to spill is not necessarily dst_partition, // so we can try again to insert the row. - NewPartitionedHashTable* hash_tbl = GetHashTable(partition_idx); + PartitionedHashTable* hash_tbl = GetHashTable(partition_idx); Partition* dst_partition = hash_partitions_[partition_idx]; DCHECK(dst_partition != nullptr); DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL); @@ -119,7 +117,7 @@ Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row, bool found; // Find the appropriate bucket in the hash table. There will always be a free // bucket because we checked the size above. - NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); + PartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); DCHECK(!it.AtEnd()) << "Hash table had no free buckets"; if (AGGREGATED_ROWS) { // If the row is already an aggregate row, it cannot match anything in the @@ -138,8 +136,8 @@ Status NewPartitionedAggregationNode::ProcessRow(TupleRow* row, } template -Status NewPartitionedAggregationNode::AddIntermediateTuple(Partition* partition, - TupleRow* row, uint32_t hash, NewPartitionedHashTable::Iterator insert_it) { +Status PartitionedAggregationNode::AddIntermediateTuple(Partition* partition, + TupleRow* row, uint32_t hash, PartitionedHashTable::Iterator insert_it) { while (true) { DCHECK(partition->aggregated_row_stream->is_pinned()); Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals, @@ -162,15 +160,15 @@ Status NewPartitionedAggregationNode::AddIntermediateTuple(Partition* partition, } } -Status NewPartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, +Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, RowBatch* in_batch, RowBatch* out_batch, - NewPartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { + PartitionedHashTableCtx* ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { DCHECK(is_streaming_preagg_); DCHECK_EQ(out_batch->num_rows(), 0); DCHECK_LE(in_batch->num_rows(), out_batch->capacity()); RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); - NewPartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + PartitionedHashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int num_rows = in_batch->num_rows(); const int cache_size = expr_vals_cache->capacity(); for (int group_start = 0; group_start < num_rows; group_start += cache_size) { @@ -212,16 +210,16 @@ Status NewPartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize return Status::OK(); } -bool NewPartitionedAggregationNode::TryAddToHashTable( - NewPartitionedHashTableCtx* ht_ctx, Partition* partition, - NewPartitionedHashTable* hash_tbl, TupleRow* in_row, +bool PartitionedAggregationNode::TryAddToHashTable( + PartitionedHashTableCtx* ht_ctx, Partition* partition, + PartitionedHashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity, Status* status) { DCHECK(remaining_capacity != NULL); DCHECK_EQ(hash_tbl, partition->hash_tbl.get()); DCHECK_GE(*remaining_capacity, 0); bool found; // This is called from ProcessBatchStreaming() so the rows are not aggregated. - NewPartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); + PartitionedHashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found); Tuple* intermediate_tuple; if (found) { intermediate_tuple = it.GetTuple(); @@ -244,8 +242,8 @@ bool NewPartitionedAggregationNode::TryAddToHashTable( } // Instantiate required templates. -template Status NewPartitionedAggregationNode::ProcessBatch(RowBatch*, - NewPartitionedHashTableCtx*); -template Status NewPartitionedAggregationNode::ProcessBatch(RowBatch*, - NewPartitionedHashTableCtx*); +template Status PartitionedAggregationNode::ProcessBatch(RowBatch*, + PartitionedHashTableCtx*); +template Status PartitionedAggregationNode::ProcessBatch(RowBatch*, + PartitionedHashTableCtx*); diff --git a/be/src/exec/new_partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc similarity index 86% rename from be/src/exec/new_partitioned_hash_table.cc rename to be/src/exec/partitioned_hash_table.cc index 0c195257c2..9c5075421f 100644 --- a/be/src/exec/new_partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/new_partitioned_hash_table.inline.h" +#include "exec/partitioned_hash_table.inline.h" #include #include @@ -77,7 +77,7 @@ static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, HashUtil::FNV_SEED, HashUtil::FNV_SEED }; -NewPartitionedHashTableCtx::NewPartitionedHashTableCtx(const std::vector& build_exprs, +PartitionedHashTableCtx::PartitionedHashTableCtx(const std::vector& build_exprs, const std::vector& probe_exprs, bool stores_nulls, const std::vector& finds_nulls, int32_t initial_seed, int max_levels, MemPool* mem_pool, MemPool* expr_results_pool) @@ -108,14 +108,14 @@ NewPartitionedHashTableCtx::NewPartitionedHashTableCtx(const std::vector& } } -Status NewPartitionedHashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_tuples, +Status PartitionedHashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_tuples, MemTracker* tracker, const RowDescriptor& row_desc, const RowDescriptor& row_desc_probe) { int scratch_row_size = sizeof(Tuple*) * num_build_tuples; scratch_row_ = reinterpret_cast(malloc(scratch_row_size)); if (UNLIKELY(scratch_row_ == NULL)) { return Status::InternalError(Substitute("Failed to allocate $0 bytes for scratch row of " - "NewPartitionedHashTableCtx.", scratch_row_size)); + "PartitionedHashTableCtx.", scratch_row_size)); } // TODO chenhao replace ExprContext with ScalarFnEvaluator @@ -141,20 +141,20 @@ Status NewPartitionedHashTableCtx::Init(ObjectPool* pool, RuntimeState* state, i return expr_values_cache_.Init(state, mem_pool_->mem_tracker(), build_exprs_); } -Status NewPartitionedHashTableCtx::Create(ObjectPool* pool, RuntimeState* state, +Status PartitionedHashTableCtx::Create(ObjectPool* pool, RuntimeState* state, const std::vector& build_exprs, const std::vector& probe_exprs, bool stores_nulls, const std::vector& finds_nulls, int32_t initial_seed, int max_levels, int num_build_tuples, MemPool* mem_pool, MemPool* expr_results_pool, MemTracker* tracker, const RowDescriptor& row_desc, const RowDescriptor& row_desc_probe, - scoped_ptr* ht_ctx) { - ht_ctx->reset(new NewPartitionedHashTableCtx(build_exprs, probe_exprs, stores_nulls, + scoped_ptr* ht_ctx) { + ht_ctx->reset(new PartitionedHashTableCtx(build_exprs, probe_exprs, stores_nulls, finds_nulls, initial_seed, max_levels, mem_pool, expr_results_pool)); return (*ht_ctx)->Init(pool, state, num_build_tuples, tracker, row_desc, row_desc_probe); } -Status NewPartitionedHashTableCtx::Open(RuntimeState* state) { +Status PartitionedHashTableCtx::Open(RuntimeState* state) { // TODO chenhao replace ExprContext with ScalarFnEvaluator for (int i = 0; i < build_expr_evals_.size(); i++) { RETURN_IF_ERROR(build_expr_evals_[i]->open(state)); @@ -165,7 +165,7 @@ Status NewPartitionedHashTableCtx::Open(RuntimeState* state) { return Status::OK(); } -void NewPartitionedHashTableCtx::Close(RuntimeState* state) { +void PartitionedHashTableCtx::Close(RuntimeState* state) { free(scratch_row_); scratch_row_ = NULL; expr_values_cache_.Close(mem_pool_->mem_tracker()); @@ -183,27 +183,27 @@ void NewPartitionedHashTableCtx::Close(RuntimeState* state) { probe_expr_evals_.clear(); } -void NewPartitionedHashTableCtx::FreeBuildLocalAllocations() { +void PartitionedHashTableCtx::FreeBuildLocalAllocations() { //ExprContext::FreeLocalAllocations(build_expr_evals_); } -void NewPartitionedHashTableCtx::FreeProbeLocalAllocations() { +void PartitionedHashTableCtx::FreeProbeLocalAllocations() { //ExprContext::FreeLocalAllocations(probe_expr_evals_); } -void NewPartitionedHashTableCtx::FreeLocalAllocations() { +void PartitionedHashTableCtx::FreeLocalAllocations() { FreeBuildLocalAllocations(); FreeProbeLocalAllocations(); } -uint32_t NewPartitionedHashTableCtx::Hash(const void* input, int len, uint32_t hash) const { +uint32_t PartitionedHashTableCtx::Hash(const void* input, int len, uint32_t hash) const { /// Use CRC hash at first level for better performance. Switch to murmur hash at /// subsequent levels since CRC doesn't randomize well with different seed inputs. if (level_ == 0) return HashUtil::hash(input, len, hash); return HashUtil::murmur_hash2_64(input, len, hash); } -uint32_t NewPartitionedHashTableCtx::HashRow( +uint32_t PartitionedHashTableCtx::HashRow( const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept { DCHECK_LT(level_, seeds_.size()); if (expr_values_cache_.var_result_offset() == -1) { @@ -212,11 +212,11 @@ uint32_t NewPartitionedHashTableCtx::HashRow( return Hash( expr_values, expr_values_cache_.expr_values_bytes_per_row(), seeds_[level_]); } else { - return NewPartitionedHashTableCtx::HashVariableLenRow(expr_values, expr_values_null); + return PartitionedHashTableCtx::HashVariableLenRow(expr_values, expr_values_null); } } -bool NewPartitionedHashTableCtx::EvalRow(TupleRow* row, const vector& ctxs, +bool PartitionedHashTableCtx::EvalRow(TupleRow* row, const vector& ctxs, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { bool has_null = false; for (int i = 0; i < ctxs.size(); ++i) { @@ -241,7 +241,7 @@ bool NewPartitionedHashTableCtx::EvalRow(TupleRow* row, const vector -bool NewPartitionedHashTableCtx::Equals(TupleRow* build_row, const uint8_t* expr_values, +bool PartitionedHashTableCtx::Equals(TupleRow* build_row, const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept { for (int i = 0; i < build_expr_evals_.size(); ++i) { void* val = build_expr_evals_[i]->get_value(build_row); @@ -293,12 +293,12 @@ bool NewPartitionedHashTableCtx::Equals(TupleRow* build_row, const uint8_t* expr return true; } -template bool NewPartitionedHashTableCtx::Equals(TupleRow* build_row, +template bool PartitionedHashTableCtx::Equals(TupleRow* build_row, const uint8_t* expr_values, const uint8_t* expr_values_null) const; -template bool NewPartitionedHashTableCtx::Equals(TupleRow* build_row, +template bool PartitionedHashTableCtx::Equals(TupleRow* build_row, const uint8_t* expr_values, const uint8_t* expr_values_null) const; -NewPartitionedHashTableCtx::ExprValuesCache::ExprValuesCache() +PartitionedHashTableCtx::ExprValuesCache::ExprValuesCache() : capacity_(0), cur_expr_values_(NULL), cur_expr_values_null_(NULL), @@ -309,7 +309,7 @@ NewPartitionedHashTableCtx::ExprValuesCache::ExprValuesCache() expr_values_hash_array_(NULL), null_bitmap_(0) {} -Status NewPartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, +Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, MemTracker* tracker, const std::vector& build_exprs) { // Initialize the number of expressions. num_exprs_ = build_exprs.size(); @@ -330,7 +330,7 @@ Status NewPartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_); if (UNLIKELY(!tracker->try_consume(mem_usage))) { capacity_ = 0; - string details = Substitute("NewPartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes.", + string details = Substitute("PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes.", mem_usage); return tracker->MemLimitExceeded(state, details, mem_usage); } @@ -354,7 +354,7 @@ Status NewPartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state, return Status::OK(); } -void NewPartitionedHashTableCtx::ExprValuesCache::Close(MemTracker* tracker) { +void PartitionedHashTableCtx::ExprValuesCache::Close(MemTracker* tracker) { if (capacity_ == 0) return; cur_expr_values_ = NULL; cur_expr_values_null_ = NULL; @@ -368,7 +368,7 @@ void NewPartitionedHashTableCtx::ExprValuesCache::Close(MemTracker* tracker) { tracker->release(mem_usage); } -int NewPartitionedHashTableCtx::ExprValuesCache::MemUsage(int capacity, +int PartitionedHashTableCtx::ExprValuesCache::MemUsage(int capacity, int expr_values_bytes_per_row, int num_exprs) { return expr_values_bytes_per_row * capacity + // expr_values_array_ num_exprs * capacity + // expr_values_null_array_ @@ -376,23 +376,23 @@ int NewPartitionedHashTableCtx::ExprValuesCache::MemUsage(int capacity, Bitmap::MemUsage(capacity); // null_bitmap_ } -uint8_t* NewPartitionedHashTableCtx::ExprValuesCache::ExprValuePtr( +uint8_t* PartitionedHashTableCtx::ExprValuesCache::ExprValuePtr( uint8_t* expr_values, int expr_idx) const { return expr_values + expr_values_offsets_[expr_idx]; } -const uint8_t* NewPartitionedHashTableCtx::ExprValuesCache::ExprValuePtr( +const uint8_t* PartitionedHashTableCtx::ExprValuesCache::ExprValuePtr( const uint8_t* expr_values, int expr_idx) const { return expr_values + expr_values_offsets_[expr_idx]; } -void NewPartitionedHashTableCtx::ExprValuesCache::ResetIterators() { +void PartitionedHashTableCtx::ExprValuesCache::ResetIterators() { cur_expr_values_ = expr_values_array_.get(); cur_expr_values_null_ = expr_values_null_array_.get(); cur_expr_values_hash_ = expr_values_hash_array_.get(); } -void NewPartitionedHashTableCtx::ExprValuesCache::Reset() noexcept { +void PartitionedHashTableCtx::ExprValuesCache::Reset() noexcept { ResetIterators(); // Set the end pointer after resetting the other pointers so they point to // the same location. @@ -400,24 +400,24 @@ void NewPartitionedHashTableCtx::ExprValuesCache::Reset() noexcept { null_bitmap_.SetAllBits(false); } -void NewPartitionedHashTableCtx::ExprValuesCache::ResetForRead() { +void PartitionedHashTableCtx::ExprValuesCache::ResetForRead() { // Record the end of hash values iterator to be used in AtEnd(). // Do it before resetting the pointers. cur_expr_values_hash_end_ = cur_expr_values_hash_; ResetIterators(); } -constexpr double NewPartitionedHashTable::MAX_FILL_FACTOR; -constexpr int64_t NewPartitionedHashTable::DATA_PAGE_SIZE; +constexpr double PartitionedHashTable::MAX_FILL_FACTOR; +constexpr int64_t PartitionedHashTable::DATA_PAGE_SIZE; -NewPartitionedHashTable* NewPartitionedHashTable::Create(Suballocator* allocator, bool stores_duplicates, +PartitionedHashTable* PartitionedHashTable::Create(Suballocator* allocator, bool stores_duplicates, int num_build_tuples, BufferedTupleStream3* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets) { - return new NewPartitionedHashTable(config::enable_quadratic_probing, allocator, stores_duplicates, + return new PartitionedHashTable(config::enable_quadratic_probing, allocator, stores_duplicates, num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); } -NewPartitionedHashTable::NewPartitionedHashTable(bool quadratic_probing, Suballocator* allocator, +PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates, int num_build_tuples, BufferedTupleStream3* stream, int64_t max_num_buckets, int64_t num_buckets) : allocator_(allocator), @@ -443,7 +443,7 @@ NewPartitionedHashTable::NewPartitionedHashTable(bool quadratic_probing, Suballo DCHECK(stores_tuples_ || stream != NULL); } -Status NewPartitionedHashTable::Init(bool* got_memory) { +Status PartitionedHashTable::Init(bool* got_memory) { int64_t buckets_byte_size = num_buckets_ * sizeof(Bucket); RETURN_IF_ERROR(allocator_->Allocate(buckets_byte_size, &bucket_allocation_)); if (bucket_allocation_ == nullptr) { @@ -457,7 +457,7 @@ Status NewPartitionedHashTable::Init(bool* got_memory) { return Status::OK(); } -void NewPartitionedHashTable::Close() { +void PartitionedHashTable::Close() { // Print statistics only for the large or heavily used hash tables. // TODO: Tweak these numbers/conditions, or print them always? const int64_t LARGE_HT = 128 * 1024; @@ -472,8 +472,8 @@ void NewPartitionedHashTable::Close() { if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_)); } -Status NewPartitionedHashTable::CheckAndResize( - uint64_t buckets_to_fill, const NewPartitionedHashTableCtx* ht_ctx, bool* got_memory) { +Status PartitionedHashTable::CheckAndResize( + uint64_t buckets_to_fill, const PartitionedHashTableCtx* ht_ctx, bool* got_memory) { uint64_t shift = 0; while (num_filled_buckets_ + buckets_to_fill > (num_buckets_ << shift) * MAX_FILL_FACTOR) { @@ -484,8 +484,8 @@ Status NewPartitionedHashTable::CheckAndResize( return Status::OK(); } -Status NewPartitionedHashTable::ResizeBuckets( - int64_t num_buckets, const NewPartitionedHashTableCtx* ht_ctx, bool* got_memory) { +Status PartitionedHashTable::ResizeBuckets( + int64_t num_buckets, const PartitionedHashTableCtx* ht_ctx, bool* got_memory) { DCHECK_EQ((num_buckets & (num_buckets - 1)), 0) << "num_buckets=" << num_buckets << " must be a power of 2"; DCHECK_GT(num_buckets, num_filled_buckets_) @@ -519,7 +519,7 @@ Status NewPartitionedHashTable::ResizeBuckets( // Walk the old table and copy all the filled buckets to the new (resized) table. // We do not have to do anything with the duplicate nodes. This operation is expected // to succeed. - for (NewPartitionedHashTable::Iterator iter = Begin(ht_ctx); !iter.AtEnd(); + for (PartitionedHashTable::Iterator iter = Begin(ht_ctx); !iter.AtEnd(); NextFilledBucket(&iter.bucket_idx_, &iter.node_)) { Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_]; bool found = false; @@ -540,7 +540,7 @@ Status NewPartitionedHashTable::ResizeBuckets( return Status::OK(); } -bool NewPartitionedHashTable::GrowNodeArray(Status* status) { +bool PartitionedHashTable::GrowNodeArray(Status* status) { std::unique_ptr allocation; *status = allocator_->Allocate(DATA_PAGE_SIZE, &allocation); if (!status->ok() || allocation == nullptr) return false; @@ -552,7 +552,7 @@ bool NewPartitionedHashTable::GrowNodeArray(Status* status) { return true; } -void NewPartitionedHashTable::DebugStringTuple(std::stringstream& ss, HtData& htdata, +void PartitionedHashTable::DebugStringTuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc) { if (stores_tuples_) { ss << "(" << htdata.tuple << ")"; @@ -565,7 +565,7 @@ void NewPartitionedHashTable::DebugStringTuple(std::stringstream& ss, HtData& ht } } -string NewPartitionedHashTable::DebugString(bool skip_empty, bool show_match, +string PartitionedHashTable::DebugString(bool skip_empty, bool show_match, const RowDescriptor* desc) { std::stringstream ss; ss << std::endl; @@ -602,7 +602,7 @@ string NewPartitionedHashTable::DebugString(bool skip_empty, bool show_match, return ss.str(); } -string NewPartitionedHashTable::PrintStats() const { +string PartitionedHashTable::PrintStats() const { double curr_fill_factor = (double)num_filled_buckets_/(double)num_buckets_; double avg_travel = (double)travel_length_/(double)num_probes_; double avg_collisions = (double)num_hash_collisions_/(double)num_filled_buckets_; diff --git a/be/src/exec/new_partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h similarity index 96% rename from be/src/exec/new_partitioned_hash_table.h rename to be/src/exec/partitioned_hash_table.h index e4faf7a95a..5b331cbe85 100644 --- a/be/src/exec/new_partitioned_hash_table.h +++ b/be/src/exec/partitioned_hash_table.h @@ -38,11 +38,11 @@ namespace doris { class Expr; class ExprContext; class MemTracker; +class PartitionedHashTable; class RowDescriptor; class RuntimeState; class Tuple; class TupleRow; -class NewPartitionedHashTable; /// Linear or quadratic probing hash table implementation tailored to the usage pattern /// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and @@ -102,7 +102,7 @@ class NewPartitionedHashTable; /// Control block for a hash table. This class contains the logic as well as the variables /// needed by a thread to operate on a hash table. -class NewPartitionedHashTableCtx { +class PartitionedHashTableCtx { public: /// Create a hash table context with the specified parameters, invoke Init() to @@ -117,7 +117,7 @@ class NewPartitionedHashTableCtx { int num_build_tuples, MemPool* mem_pool, MemPool* expr_results_pool, MemTracker* tracker, const RowDescriptor& row_desc, const RowDescriptor& row_desc_probe, - boost::scoped_ptr* ht_ctx); + boost::scoped_ptr* ht_ctx); /// Initialize the build and probe expression evaluators. Status Open(RuntimeState* state); @@ -210,7 +210,7 @@ class NewPartitionedHashTableCtx { /// Allocates memory and initializes various data structures. Return error status /// if memory allocation leads to the memory limits of the exec node to be exceeded. - /// 'tracker' is the memory tracker of the exec node which owns this NewPartitionedHashTableCtx. + /// 'tracker' is the memory tracker of the exec node which owns this PartitionedHashTableCtx. Status Init(RuntimeState* state, MemTracker* tracker, const std::vector& build_exprs); @@ -287,7 +287,7 @@ class NewPartitionedHashTableCtx { } private: - friend class NewPartitionedHashTableCtx; + friend class PartitionedHashTableCtx; /// Resets the iterators to the beginning of the cache values' arrays. void ResetIterators(); @@ -358,7 +358,7 @@ class NewPartitionedHashTableCtx { ExprValuesCache* ALWAYS_INLINE expr_values_cache() { return &expr_values_cache_; } private: - friend class NewPartitionedHashTable; + friend class PartitionedHashTable; friend class HashTableTest_HashEmpty_Test; /// Construct a hash table context. @@ -378,7 +378,7 @@ class NewPartitionedHashTableCtx { /// with '<=>' and others with '=', stores_nulls could distinguish between columns /// in which nulls are stored and columns in which they are not, which could save /// space by not storing some rows we know will never match. - NewPartitionedHashTableCtx(const std::vector& build_exprs, + PartitionedHashTableCtx(const std::vector& build_exprs, const std::vector& probe_exprs, bool stores_nulls, const std::vector& finds_nulls, int32_t initial_seed, int max_levels, MemPool* mem_pool, MemPool* expr_results_pool); @@ -496,7 +496,7 @@ class NewPartitionedHashTableCtx { /// value, the one in the bucket. The data is either a tuple stream index or a Tuple*. /// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The /// data allocated by the hash table comes from the BufferPool. -class NewPartitionedHashTable { +class PartitionedHashTable { private: /// Rows are represented as pointers into the BufferedTupleStream data with one @@ -563,7 +563,7 @@ class NewPartitionedHashTable { /// -1, if it unlimited. /// - initial_num_buckets: number of buckets that the hash table should be initialized /// with. - static NewPartitionedHashTable* Create(Suballocator* allocator, bool stores_duplicates, + static PartitionedHashTable* Create(Suballocator* allocator, bool stores_duplicates, int num_build_tuples, BufferedTupleStream3* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets); @@ -586,7 +586,7 @@ class NewPartitionedHashTable { /// only one tuple, a pointer to that tuple is stored. Otherwise the 'flat_row' pointer /// is stored. The 'row' is not copied by the hash table and the caller must guarantee /// it stays in memory. This will not grow the hash table. - bool IR_ALWAYS_INLINE Insert(NewPartitionedHashTableCtx* ht_ctx, + bool IR_ALWAYS_INLINE Insert(PartitionedHashTableCtx* ht_ctx, BufferedTupleStream3::FlatRowPtr flat_row, TupleRow* row, Status* status); @@ -602,13 +602,13 @@ class NewPartitionedHashTable { /// row. The matching rows do not need to be evaluated since all the nodes of a bucket /// are duplicates. One scan can be in progress for each 'ht_ctx'. Used in the probe /// phase of hash joins. - Iterator IR_ALWAYS_INLINE FindProbeRow(NewPartitionedHashTableCtx* ht_ctx); + Iterator IR_ALWAYS_INLINE FindProbeRow(PartitionedHashTableCtx* ht_ctx); /// If a match is found in the table, return an iterator as in FindProbeRow(). If a /// match was not present, return an iterator pointing to the empty bucket where the key /// should be inserted. Returns End() if the table is full. The caller can set the data /// in the bucket using a Set*() method on the iterator. - Iterator IR_ALWAYS_INLINE FindBuildRowBucket(NewPartitionedHashTableCtx* ht_ctx, bool* found); + Iterator IR_ALWAYS_INLINE FindBuildRowBucket(PartitionedHashTableCtx* ht_ctx, bool* found); /// Returns number of elements inserted in the hash table int64_t size() const { @@ -655,7 +655,7 @@ class NewPartitionedHashTable { /// inserted without need to resize. If there is not enough memory available to /// resize the hash table, Status::OK()() is returned and 'got_memory' is false. If a /// another error occurs, an error status may be returned. - Status CheckAndResize(uint64_t buckets_to_fill, const NewPartitionedHashTableCtx* ht_ctx, + Status CheckAndResize(uint64_t buckets_to_fill, const PartitionedHashTableCtx* ht_ctx, bool* got_memory); /// Returns the number of bytes allocated to the hash table from the block manager. @@ -665,12 +665,12 @@ class NewPartitionedHashTable { /// Returns an iterator at the beginning of the hash table. Advancing this iterator /// will traverse all elements. - Iterator Begin(const NewPartitionedHashTableCtx* ht_ctx); + Iterator Begin(const PartitionedHashTableCtx* ht_ctx); /// Return an iterator pointing to the first element (Bucket or DuplicateNode, if the /// bucket has duplicates) in the hash table that does not have its matched flag set. /// Used in right joins and full-outer joins. - Iterator FirstUnmatched(NewPartitionedHashTableCtx* ctx); + Iterator FirstUnmatched(PartitionedHashTableCtx* ctx); /// Return true if there was a least one match. bool HasMatches() const { return has_matches_; } @@ -752,17 +752,17 @@ class NewPartitionedHashTable { void IR_ALWAYS_INLINE PrefetchBucket(); private: - friend class NewPartitionedHashTable; + friend class PartitionedHashTable; ALWAYS_INLINE - Iterator(NewPartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node) + Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node) : table_(table), scratch_row_(row), bucket_idx_(bucket_idx), node_(node) { } - NewPartitionedHashTable* table_; + PartitionedHashTable* table_; /// Scratch buffer to hold generated rows. Not owned. TupleRow* scratch_row_; @@ -782,7 +782,7 @@ class NewPartitionedHashTable { /// of calling this constructor directly. /// - quadratic_probing: set to true when the probing algorithm is quadratic, as /// opposed to linear. - NewPartitionedHashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates, + PartitionedHashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates, int num_build_tuples, BufferedTupleStream3* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets); @@ -811,13 +811,13 @@ class NewPartitionedHashTable { /// There are wrappers of this function that perform the Find and Insert logic. template int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets, - NewPartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found); + PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found); /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node /// where the data should be inserted. Returns NULL if the insert was not successful /// and either sets 'status' to OK if it failed because not enough reservation was /// available or the error if an error was encountered. - HtData* IR_ALWAYS_INLINE InsertInternal(NewPartitionedHashTableCtx* ht_ctx, Status* status); + HtData* IR_ALWAYS_INLINE InsertInternal(PartitionedHashTableCtx* ht_ctx, Status* status); /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has /// duplicates, 'node' will be pointing to the head of the linked list of duplicates. @@ -826,7 +826,7 @@ class NewPartitionedHashTable { void NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node); /// Resize the hash table to 'num_buckets'. 'got_memory' is false on OOM. - Status ResizeBuckets(int64_t num_buckets, const NewPartitionedHashTableCtx* ht_ctx, bool* got_memory); + Status ResizeBuckets(int64_t num_buckets, const PartitionedHashTableCtx* ht_ctx, bool* got_memory); /// Appends the DuplicateNode pointed by next_node_ to 'bucket' and moves the next_node_ /// pointer to the next DuplicateNode in the page, updating the remaining node counter. diff --git a/be/src/exec/new_partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h similarity index 82% rename from be/src/exec/new_partitioned_hash_table.inline.h rename to be/src/exec/partitioned_hash_table.inline.h index 43be7760f2..2cdbe01a36 100644 --- a/be/src/exec/new_partitioned_hash_table.inline.h +++ b/be/src/exec/partitioned_hash_table.inline.h @@ -18,14 +18,14 @@ #ifndef DORIS_BE_SRC_EXEC_NEW_PARTITIONED_HASH_TABLE_INLINE_H #define DORIS_BE_SRC_EXEC_NEW_PARTITIONED_HASH_TABLE_INLINE_H -#include "exec/new_partitioned_hash_table.h" +#include "exec/partitioned_hash_table.h" #include "exprs/expr.h" #include "exprs/expr_context.h" namespace doris { -inline bool NewPartitionedHashTableCtx::EvalAndHashBuild(TupleRow* row) { +inline bool PartitionedHashTableCtx::EvalAndHashBuild(TupleRow* row) { uint8_t* expr_values = expr_values_cache_.cur_expr_values(); uint8_t* expr_values_null = expr_values_cache_.cur_expr_values_null(); bool has_null = EvalBuildRow(row, expr_values, expr_values_null); @@ -34,7 +34,7 @@ inline bool NewPartitionedHashTableCtx::EvalAndHashBuild(TupleRow* row) { return true; } -inline bool NewPartitionedHashTableCtx::EvalAndHashProbe(TupleRow* row) { +inline bool PartitionedHashTableCtx::EvalAndHashProbe(TupleRow* row) { uint8_t* expr_values = expr_values_cache_.cur_expr_values(); uint8_t* expr_values_null = expr_values_cache_.cur_expr_values_null(); bool has_null = EvalProbeRow(row, expr_values, expr_values_null); @@ -43,7 +43,7 @@ inline bool NewPartitionedHashTableCtx::EvalAndHashProbe(TupleRow* row) { return true; } -inline void NewPartitionedHashTableCtx::ExprValuesCache::NextRow() { +inline void PartitionedHashTableCtx::ExprValuesCache::NextRow() { cur_expr_values_ += expr_values_bytes_per_row_; cur_expr_values_null_ += num_exprs_; ++cur_expr_values_hash_; @@ -51,8 +51,8 @@ inline void NewPartitionedHashTableCtx::ExprValuesCache::NextRow() { } template -inline int64_t NewPartitionedHashTable::Probe(Bucket* buckets, int64_t num_buckets, - NewPartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) { +inline int64_t PartitionedHashTable::Probe(Bucket* buckets, int64_t num_buckets, + PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) { DCHECK(buckets != NULL); DCHECK_GT(num_buckets, 0); *found = false; @@ -92,8 +92,8 @@ inline int64_t NewPartitionedHashTable::Probe(Bucket* buckets, int64_t num_bucke return Iterator::BUCKET_NOT_FOUND; } -inline NewPartitionedHashTable::HtData* NewPartitionedHashTable::InsertInternal( - NewPartitionedHashTableCtx* ht_ctx, Status* status) { +inline PartitionedHashTable::HtData* PartitionedHashTable::InsertInternal( + PartitionedHashTableCtx* ht_ctx, Status* status) { ++num_probes_; bool found = false; uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash(); @@ -110,7 +110,7 @@ inline NewPartitionedHashTable::HtData* NewPartitionedHashTable::InsertInternal( } } -inline bool NewPartitionedHashTable::Insert(NewPartitionedHashTableCtx* ht_ctx, +inline bool PartitionedHashTable::Insert(PartitionedHashTableCtx* ht_ctx, BufferedTupleStream3::FlatRowPtr flat_row, TupleRow* row, Status* status) { HtData* htdata = InsertInternal(ht_ctx, status); // If successful insert, update the contents of the newly inserted entry with 'idx'. @@ -126,7 +126,7 @@ inline bool NewPartitionedHashTable::Insert(NewPartitionedHashTableCtx* ht_ctx, } template -inline void NewPartitionedHashTable::PrefetchBucket(uint32_t hash) { +inline void PartitionedHashTable::PrefetchBucket(uint32_t hash) { int64_t bucket_idx = hash & (num_buckets_ - 1); // Two optional arguments: // 'rw': 1 means the memory access is write @@ -136,8 +136,8 @@ inline void NewPartitionedHashTable::PrefetchBucket(uint32_t hash) { __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1); } -inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FindProbeRow( - NewPartitionedHashTableCtx* ht_ctx) { +inline PartitionedHashTable::Iterator PartitionedHashTable::FindProbeRow( + PartitionedHashTableCtx* ht_ctx) { ++num_probes_; bool found = false; uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash(); @@ -150,8 +150,8 @@ inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FindProbeRow( } // TODO: support lazy evaluation like HashTable::Insert(). -inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FindBuildRowBucket( - NewPartitionedHashTableCtx* ht_ctx, bool* found) { +inline PartitionedHashTable::Iterator PartitionedHashTable::FindBuildRowBucket( + PartitionedHashTableCtx* ht_ctx, bool* found) { ++num_probes_; uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash(); int64_t bucket_idx = Probe(buckets_, num_buckets_, ht_ctx, hash, found); @@ -162,16 +162,16 @@ inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FindBuildRowBu return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates); } -inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::Begin( - const NewPartitionedHashTableCtx* ctx) { +inline PartitionedHashTable::Iterator PartitionedHashTable::Begin( + const PartitionedHashTableCtx* ctx) { int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; DuplicateNode* node = NULL; NextFilledBucket(&bucket_idx, &node); return Iterator(this, ctx->scratch_row(), bucket_idx, node); } -inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FirstUnmatched( - NewPartitionedHashTableCtx* ctx) { +inline PartitionedHashTable::Iterator PartitionedHashTable::FirstUnmatched( + PartitionedHashTableCtx* ctx) { int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND; DuplicateNode* node = NULL; NextFilledBucket(&bucket_idx, &node); @@ -186,7 +186,7 @@ inline NewPartitionedHashTable::Iterator NewPartitionedHashTable::FirstUnmatched return it; } -inline void NewPartitionedHashTable::NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node) { +inline void PartitionedHashTable::NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node) { ++*bucket_idx; for (; *bucket_idx < num_buckets_; ++*bucket_idx) { if (buckets_[*bucket_idx].filled) { @@ -199,7 +199,7 @@ inline void NewPartitionedHashTable::NextFilledBucket(int64_t* bucket_idx, Dupli *node = NULL; } -inline void NewPartitionedHashTable::PrepareBucketForInsert(int64_t bucket_idx, uint32_t hash) { +inline void PartitionedHashTable::PrepareBucketForInsert(int64_t bucket_idx, uint32_t hash) { DCHECK_GE(bucket_idx, 0); DCHECK_LT(bucket_idx, num_buckets_); Bucket* bucket = &buckets_[bucket_idx]; @@ -211,7 +211,7 @@ inline void NewPartitionedHashTable::PrepareBucketForInsert(int64_t bucket_idx, bucket->hash = hash; } -inline NewPartitionedHashTable::DuplicateNode* NewPartitionedHashTable::AppendNextNode(Bucket* bucket) { +inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::AppendNextNode(Bucket* bucket) { DCHECK_GT(node_remaining_current_page_, 0); bucket->bucketData.duplicates = next_node_; ++num_duplicate_nodes_; @@ -219,7 +219,7 @@ inline NewPartitionedHashTable::DuplicateNode* NewPartitionedHashTable::AppendNe return next_node_++; } -inline NewPartitionedHashTable::DuplicateNode* NewPartitionedHashTable::InsertDuplicateNode( +inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::InsertDuplicateNode( int64_t bucket_idx, Status* status) { DCHECK_GE(bucket_idx, 0); DCHECK_LT(bucket_idx, num_buckets_); @@ -248,7 +248,7 @@ inline NewPartitionedHashTable::DuplicateNode* NewPartitionedHashTable::InsertDu return AppendNextNode(bucket); } -inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::GetRow(HtData& htdata, TupleRow* row) const { +inline TupleRow* IR_ALWAYS_INLINE PartitionedHashTable::GetRow(HtData& htdata, TupleRow* row) const { if (stores_tuples()) { return reinterpret_cast(&htdata.tuple); } else { @@ -258,7 +258,7 @@ inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::GetRow(HtData& htdata } } -inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::GetRow(Bucket* bucket, TupleRow* row) const { +inline TupleRow* IR_ALWAYS_INLINE PartitionedHashTable::GetRow(Bucket* bucket, TupleRow* row) const { DCHECK(bucket != NULL); if (UNLIKELY(stores_duplicates() && bucket->hasDuplicates)) { DuplicateNode* duplicate = bucket->bucketData.duplicates; @@ -269,7 +269,7 @@ inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::GetRow(Bucket* bucket } } -inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::Iterator::GetRow() const { +inline TupleRow* IR_ALWAYS_INLINE PartitionedHashTable::Iterator::GetRow() const { DCHECK(!AtEnd()); DCHECK(table_ != NULL); DCHECK(scratch_row_ != NULL); @@ -282,7 +282,7 @@ inline TupleRow* IR_ALWAYS_INLINE NewPartitionedHashTable::Iterator::GetRow() co } } -inline Tuple* IR_ALWAYS_INLINE NewPartitionedHashTable::Iterator::GetTuple() const { +inline Tuple* IR_ALWAYS_INLINE PartitionedHashTable::Iterator::GetTuple() const { DCHECK(!AtEnd()); DCHECK(table_->stores_tuples()); Bucket* bucket = &table_->buckets_[bucket_idx_]; @@ -295,14 +295,14 @@ inline Tuple* IR_ALWAYS_INLINE NewPartitionedHashTable::Iterator::GetTuple() con } } -inline void NewPartitionedHashTable::Iterator::SetTuple(Tuple* tuple, uint32_t hash) { +inline void PartitionedHashTable::Iterator::SetTuple(Tuple* tuple, uint32_t hash) { DCHECK(!AtEnd()); DCHECK(table_->stores_tuples()); table_->PrepareBucketForInsert(bucket_idx_, hash); table_->buckets_[bucket_idx_].bucketData.htdata.tuple = tuple; } -inline void NewPartitionedHashTable::Iterator::SetMatched() { +inline void PartitionedHashTable::Iterator::SetMatched() { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; if (table_->stores_duplicates() && bucket->hasDuplicates) { @@ -315,7 +315,7 @@ inline void NewPartitionedHashTable::Iterator::SetMatched() { table_->has_matches_ = true; } -inline bool NewPartitionedHashTable::Iterator::IsMatched() const { +inline bool PartitionedHashTable::Iterator::IsMatched() const { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; if (table_->stores_duplicates() && bucket->hasDuplicates) { @@ -324,13 +324,13 @@ inline bool NewPartitionedHashTable::Iterator::IsMatched() const { return bucket->matched; } -inline void NewPartitionedHashTable::Iterator::SetAtEnd() { +inline void PartitionedHashTable::Iterator::SetAtEnd() { bucket_idx_ = BUCKET_NOT_FOUND; node_ = NULL; } template -inline void NewPartitionedHashTable::Iterator::PrefetchBucket() { +inline void PartitionedHashTable::Iterator::PrefetchBucket() { if (LIKELY(!AtEnd())) { // HashTable::PrefetchBucket() takes a hash value to index into the hash bucket // array. Passing 'bucket_idx_' here is sufficient. @@ -339,7 +339,7 @@ inline void NewPartitionedHashTable::Iterator::PrefetchBucket() { } } -inline void NewPartitionedHashTable::Iterator::Next() { +inline void PartitionedHashTable::Iterator::Next() { DCHECK(!AtEnd()); if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) { @@ -349,7 +349,7 @@ inline void NewPartitionedHashTable::Iterator::Next() { } } -inline void NewPartitionedHashTable::Iterator::NextDuplicate() { +inline void PartitionedHashTable::Iterator::NextDuplicate() { DCHECK(!AtEnd()); if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) { @@ -360,7 +360,7 @@ inline void NewPartitionedHashTable::Iterator::NextDuplicate() { } } -inline void NewPartitionedHashTable::Iterator::NextUnmatched() { +inline void PartitionedHashTable::Iterator::NextUnmatched() { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; // Check if there is any remaining unmatched duplicate node in the current bucket. @@ -387,17 +387,17 @@ inline void NewPartitionedHashTable::Iterator::NextUnmatched() { } } -inline void NewPartitionedHashTableCtx::set_level(int level) { +inline void PartitionedHashTableCtx::set_level(int level) { DCHECK_GE(level, 0); DCHECK_LT(level, seeds_.size()); level_ = level; } -inline int64_t NewPartitionedHashTable::CurrentMemSize() const { +inline int64_t PartitionedHashTable::CurrentMemSize() const { return num_buckets_ * sizeof(Bucket) + num_duplicate_nodes_ * sizeof(DuplicateNode); } -inline int64_t NewPartitionedHashTable::NumInsertsBeforeResize() const { +inline int64_t PartitionedHashTable::NumInsertsBeforeResize() const { return std::max( 0, static_cast(num_buckets_ * MAX_FILL_FACTOR) - num_filled_buckets_); } diff --git a/be/src/exec/new_partitioned_hash_table_ir.cc b/be/src/exec/partitioned_hash_table_ir.cc similarity index 76% rename from be/src/exec/new_partitioned_hash_table_ir.cc rename to be/src/exec/partitioned_hash_table_ir.cc index 0ea33e2cd5..eabe610028 100644 --- a/be/src/exec/new_partitioned_hash_table_ir.cc +++ b/be/src/exec/partitioned_hash_table_ir.cc @@ -16,17 +16,17 @@ // under the License. #ifdef IR_COMPILE -#include "exec/new_partitioned_hash_table.h" +#include "exec/partitioned_hash_table.h" using namespace doris; -uint32_t NewPartitionedHashTableCtx::GetHashSeed() const { return seeds_[level_]; } +uint32_t PartitionedHashTableCtx::GetHashSeed() const { return seeds_[level_]; } -ExprContext* const* NewPartitionedHashTableCtx::build_expr_evals() const { +ExprContext* const* PartitionedHashTableCtx::build_expr_evals() const { return build_expr_evals_.data(); } -ExprContext* const* NewPartitionedHashTableCtx::probe_expr_evals() const { +ExprContext* const* PartitionedHashTableCtx::probe_expr_evals() const { return probe_expr_evals_.data(); } diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 3d9663641b..a35b8f1841 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -55,7 +55,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) { + if (config::enable_partitioned_aggregation) { _mem_tracker->consume(_tuple_ptrs_size); _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != NULL); @@ -89,7 +89,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) { + if (config::enable_partitioned_aggregation) { _mem_tracker->consume(_tuple_ptrs_size); _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); @@ -187,7 +187,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) { + if (config::enable_partitioned_aggregation) { _mem_tracker->consume(_tuple_ptrs_size); _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != NULL); @@ -288,7 +288,7 @@ void RowBatch::clear() { for (int i = 0; i < _blocks.size(); ++i) { _blocks[i]->del(); } - if (config::enable_partitioned_aggregation || config::enable_new_partitioned_aggregation) { + if (config::enable_partitioned_aggregation) { DCHECK(_tuple_ptrs != NULL); free(_tuple_ptrs); _mem_tracker->release(_tuple_ptrs_size); @@ -498,7 +498,7 @@ void RowBatch::reset() { } _blocks.clear(); _auxiliary_mem_usage = 0; - if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) { + if (!config::enable_partitioned_aggregation) { _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); } _need_to_return = false; @@ -590,7 +590,7 @@ void RowBatch::acquire_state(RowBatch* src) { _num_rows = src->_num_rows; _capacity = src->_capacity; _need_to_return = src->_need_to_return; - if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) { + if (!config::enable_partitioned_aggregation) { // Tuple pointers are allocated from tuple_data_pool_ so are transferred. _tuple_ptrs = src->_tuple_ptrs; src->_tuple_ptrs = NULL;