From 1fc5515a780a00ad96644ab92a46e83c57621ea8 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 24 Aug 2022 08:49:34 +0800 Subject: [PATCH] [enhancement](memory) Remove unused reservation tracker (#11969) --- be/src/exec/exec_node.cpp | 13 +- be/src/exec/exec_node.h | 5 - be/src/exec/partitioned_aggregation_node.cc | 58 +-- be/src/exec/partitioned_aggregation_node.h | 24 -- be/src/runtime/CMakeLists.txt | 3 - be/src/runtime/buffered_tuple_stream3.cc | 224 +--------- be/src/runtime/buffered_tuple_stream3.h | 52 +-- be/src/runtime/bufferpool/buffer_pool.cc | 101 +---- be/src/runtime/bufferpool/buffer_pool.h | 78 +--- .../runtime/bufferpool/buffer_pool_internal.h | 20 +- .../runtime/bufferpool/reservation_tracker.cc | 401 ------------------ .../runtime/bufferpool/reservation_tracker.h | 290 ------------- .../bufferpool/reservation_tracker_counters.h | 38 -- be/src/runtime/bufferpool/reservation_util.cc | 40 -- be/src/runtime/bufferpool/reservation_util.h | 71 ---- be/src/runtime/bufferpool/suballocator.cc | 5 - be/src/runtime/exec_env.h | 5 +- be/src/runtime/exec_env_init.cpp | 4 - be/src/runtime/initial_reservations.cc | 83 ---- be/src/runtime/initial_reservations.h | 78 ---- be/src/runtime/memory/mem_tracker_limiter.cpp | 45 +- be/src/runtime/runtime_state.cpp | 56 +-- be/src/runtime/runtime_state.h | 36 -- be/test/exec/tablet_sink_test.cpp | 3 - be/test/runtime/test_env.cc | 4 +- be/test/util/arrow/arrow_work_flow_test.cpp | 3 - be/test/vec/exec/vtablet_sink_test.cpp | 3 - 27 files changed, 52 insertions(+), 1691 deletions(-) delete mode 100644 be/src/runtime/bufferpool/reservation_tracker.cc delete mode 100644 be/src/runtime/bufferpool/reservation_tracker.h delete mode 100644 be/src/runtime/bufferpool/reservation_tracker_counters.h delete mode 100644 be/src/runtime/bufferpool/reservation_util.cc delete mode 100644 be/src/runtime/bufferpool/reservation_util.h delete mode 100644 be/src/runtime/initial_reservations.cc delete mode 100644 be/src/runtime/initial_reservations.h diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 66827019f9..88cbb8cdf1 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -53,7 +53,6 @@ #include "odbc_scan_node.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/initial_reservations.h" #include "runtime/memory/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" @@ -286,9 +285,6 @@ Status ExecNode::close(RuntimeState* state) { } if (_buffer_pool_client.is_registered()) { - VLOG_FILE << _id << " returning reservation " << _resource_profile.min_reservation; - state->initial_reservations()->Return(&_buffer_pool_client, - _resource_profile.min_reservation); state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client); } @@ -727,11 +723,8 @@ Status ExecNode::claim_buffer_reservation(RuntimeState* state) { } ss << print_plan_node_type(_type) << " id=" << _id << " ptr=" << this; - RETURN_IF_ERROR(buffer_pool->RegisterClient(ss.str(), state->instance_buffer_reservation(), - buffer_pool->GetSystemBytesLimit(), - runtime_profile(), &_buffer_pool_client)); + RETURN_IF_ERROR(buffer_pool->RegisterClient(ss.str(), runtime_profile(), &_buffer_pool_client)); - state->initial_reservations()->Claim(&_buffer_pool_client, _resource_profile.min_reservation); /* if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY && (debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ == TExecNodePhase::OPEN)) { @@ -744,10 +737,6 @@ Status ExecNode::claim_buffer_reservation(RuntimeState* state) { return Status::OK(); } -Status ExecNode::release_unused_reservation() { - return _buffer_pool_client.DecreaseReservationTo(_resource_profile.min_reservation); -} - void ExecNode::release_block_memory(vectorized::Block& block, uint16_t child_idx) { DCHECK(child_idx < _children.size()); block.clear_column_data(child(child_idx)->row_desc().num_materialized_slots()); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 101636f0e8..6695faf7e7 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -210,11 +210,6 @@ protected: /// as the initial reservation is not released before Close(). Status claim_buffer_reservation(RuntimeState* state); - /// Release any unused reservation in excess of the node's initial reservation. Returns - /// an error if releasing the reservation requires flushing pages to disk, and that - /// fails. - Status release_unused_reservation(); - /// Release all memory of block which got from child. The block // 1. clear mem of valid column get from child, make sure child can reuse the mem // 2. delete and release the column which create by function all and other reason diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 12b648f6b2..16332151b0 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -253,7 +253,6 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { // Claim reservation after the child has been opened to reduce the peak reservation // requirement. if (!_buffer_pool_client.is_registered() && !grouping_exprs_.empty()) { - DCHECK_GE(_resource_profile.min_reservation, MinReservation()); RETURN_IF_ERROR(claim_buffer_reservation(state)); } @@ -278,12 +277,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { state, &intermediate_row_desc_, &_buffer_pool_client, _resource_profile.spillable_buffer_size)); RETURN_IF_ERROR(serialize_stream_->Init(id(), false)); - bool got_buffer; - // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up - // another buffer during spilling. - RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer)); - // DCHECK(got_buffer) - // << "Accounted in min reservation" << _buffer_pool_client.DebugString(); + RETURN_IF_ERROR(serialize_stream_->PrepareForWrite()); DCHECK(serialize_stream_->has_write_iterator()); } } @@ -743,13 +737,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() { parent->state_, &parent->intermediate_row_desc_, &parent->_buffer_pool_client, parent->_resource_profile.spillable_buffer_size, external_varlen_slots)); RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true)); - bool got_buffer; - RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer)); - // TODO(zxy) If exec_mem_limit is very small, DCHECK(false) will occur, the logic of - // reservation tracker needs to be deleted or refactored - // DCHECK(got_buffer) << "Buffer included in reservation " << parent->_id << "\n" - // << parent->_buffer_pool_client.DebugString() << "\n" - // << parent->DebugString(2); + RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite()); if (!parent->is_streaming_preagg_) { unaggregated_row_stream.reset(new BufferedTupleStream3( @@ -828,9 +816,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { parent->_resource_profile.spillable_buffer_size)); status = parent->serialize_stream_->Init(parent->id(), false); if (status.ok()) { - bool got_buffer; - status = parent->serialize_stream_->PrepareForWrite(&got_buffer); - // DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation"; + status = parent->serialize_stream_->PrepareForWrite(); } if (!status.ok()) { hash_tbl->Close(); @@ -873,10 +859,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { // aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL_EXCEPT_CURRENT); } else { // aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); - bool got_buffer; - RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); - // DCHECK(got_buffer) << "Accounted in min reservation" - // << parent->_buffer_pool_client.DebugString(); + RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite()); } COUNTER_UPDATE(parent->num_spilled_partitions_, 1); @@ -1196,16 +1179,6 @@ Status PartitionedAggregationNode::CheckAndResizeHashPartitions( Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); - if (!is_in_subplan() && spilled_partitions_.empty()) { - // All partitions are in memory. Release reservation that was used for previous - // partitions that is no longer needed. If we have spilled partitions, we want to - // hold onto all reservation in case it is needed to process the spilled partitions. - DCHECK(!_buffer_pool_client.has_unpinned_pages()); - Status status = release_unused_reservation(); - DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are " - << "no unpinned pages. " << status.get_error_msg(); - } - // Keep looping until we get to a partition that fits in memory. Partition* partition = nullptr; while (true) { @@ -1217,12 +1190,6 @@ Status PartitionedAggregationNode::NextPartition() { break; } - // No aggregated partitions in memory - we should not be using any reservation aside - // from 'serialize_stream_'. - DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, - _buffer_pool_client.GetUsedReservation()) - << _buffer_pool_client.DebugString(); - // Try to fit a single spilled partition in memory. We can often do this because // we only need to fit 1/PARTITION_FANOUT of the data in memory. // TODO: in some cases when the partition probably won't fit in memory it could @@ -1281,11 +1248,6 @@ Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_parti if (dst_partition->is_spilled()) { PushSpilledPartition(dst_partition); *built_partition = nullptr; - // Spilled the partition - we should not be using any reservation except from - // 'serialize_stream_'. - DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, - _buffer_pool_client.GetUsedReservation()) - << _buffer_pool_client.DebugString(); } else { *built_partition = dst_partition; } @@ -1315,9 +1277,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() { // The aggregated rows have been repartitioned. Free up at least a buffer's worth of // reservation and use it to pin the unaggregated write buffer. // hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL); - bool got_buffer; - RETURN_IF_ERROR(hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer)); - // DCHECK(got_buffer) << "Accounted in min reservation" << _buffer_pool_client.DebugString(); + RETURN_IF_ERROR(hash_partition->unaggregated_row_stream->PrepareForWrite()); } RETURN_IF_ERROR(ProcessStream(partition->unaggregated_row_stream.get())); @@ -1339,13 +1299,7 @@ template Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3* input_stream) { DCHECK(!is_streaming_preagg_); if (input_stream->num_rows() > 0) { - while (true) { - bool got_buffer = false; - RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer)); - if (got_buffer) break; - // Did not have a buffer to read the input stream. Spill and try again. - RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); - } + RETURN_IF_ERROR(input_stream->PrepareForRead(true)); bool eos = false; const RowDescriptor* desc = diff --git a/be/src/exec/partitioned_aggregation_node.h b/be/src/exec/partitioned_aggregation_node.h index c4f95d5036..c5d9a505d1 100644 --- a/be/src/exec/partitioned_aggregation_node.h +++ b/be/src/exec/partitioned_aggregation_node.h @@ -691,30 +691,6 @@ private: /// Calls finalizes on all tuples starting at 'it'. void CleanupHashTbl(const std::vector& agg_fn_evals, PartitionedHashTable::Iterator it); - - /// Compute minimum buffer reservation for grouping aggregations. - /// We need one buffer per partition, which is used either as the write buffer for the - /// aggregated stream or the unaggregated stream. We need an additional buffer to read - /// the stream we are currently repartitioning. The read buffer needs to be a max-sized - /// buffer to hold a max-sized row and we need one max-sized write buffer that is used - /// temporarily to append a row to any stream. - /// - /// If we need to serialize, we need an additional buffer while spilling a partition - /// as the partitions aggregate stream needs to be serialized and rewritten. - /// We do not spill streaming preaggregations, so we do not need to reserve any buffers. - int64_t MinReservation() const { - //DCHECK(!grouping_exprs_.empty()); - // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe. - //if (is_streaming_preagg_) { - // Reserve at least one buffer and a 64kb hash table per partition. - // return (_resource_profile.spillable_buffer_size + 64 * 1024) * PARTITION_FANOUT; - //} - //int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0); - // Two of the buffers must fit the maximum row. - //return _resource_profile.spillable_buffer_size * (num_buffers - 2) + - //_resource_profile.max_row_buffer_size * 2; - return 0; - } }; } // namespace doris diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 5ff765f137..39769f323e 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -75,11 +75,8 @@ set(RUNTIME_FILES tablets_channel.cpp bufferpool/buffer_allocator.cc bufferpool/buffer_pool.cc - bufferpool/reservation_tracker.cc - bufferpool/reservation_util.cc bufferpool/suballocator.cc bufferpool/system_allocator.cc - initial_reservations.cc snapshot_loader.cpp query_statistics.cpp message_body_sink.cpp diff --git a/be/src/runtime/buffered_tuple_stream3.cc b/be/src/runtime/buffered_tuple_stream3.cc index 8c05608831..2a35f5c70c 100644 --- a/be/src/runtime/buffered_tuple_stream3.cc +++ b/be/src/runtime/buffered_tuple_stream3.cc @@ -55,7 +55,6 @@ BufferedTupleStream3::BufferedTupleStream3(RuntimeState* state, const RowDescrip num_pages_(0), total_byte_size_(0), has_read_iterator_(false), - read_page_reservation_(buffer_pool_client_), read_page_rows_returned_(-1), read_ptr_(nullptr), read_end_ptr_(nullptr), @@ -64,7 +63,6 @@ BufferedTupleStream3::BufferedTupleStream3(RuntimeState* state, const RowDescrip rows_returned_(0), has_write_iterator_(false), write_page_(nullptr), - write_page_reservation_(buffer_pool_client_), bytes_pinned_(0), num_rows_(0), default_page_len_(default_page_len), @@ -139,16 +137,6 @@ void BufferedTupleStream3::CheckConsistencyFast() const { // flight and this would required blocking on that write. DCHECK_GE(read_end_ptr_, read_ptr_); } - if (NeedReadReservation()) { - DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation()) << DebugString(); - } else if (!read_page_reservation_.is_closed()) { - DCHECK_EQ(0, read_page_reservation_.GetReservation()); - } - if (NeedWriteReservation()) { - DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation()); - } else if (!write_page_reservation_.is_closed()) { - DCHECK_EQ(0, write_page_reservation_.GetReservation()); - } } void BufferedTupleStream3::CheckPageConsistency(const Page* page) const { @@ -172,19 +160,6 @@ string BufferedTupleStream3::DebugString() const { } else { ss << &*read_page_; } - ss << "\n" - << " read_page_reservation="; - if (read_page_reservation_.is_closed()) { - ss << ""; - } else { - ss << read_page_reservation_.GetReservation(); - } - ss << " write_page_reservation="; - if (write_page_reservation_.is_closed()) { - ss << ""; - } else { - ss << write_page_reservation_.GetReservation(); - } ss << "\n # pages=" << num_pages_ << " pages=[\n"; for (const Page& page : pages_) { ss << "{" << page.DebugString() << "}"; @@ -205,7 +180,7 @@ Status BufferedTupleStream3::Init(int node_id, bool pinned) { return Status::OK(); } -Status BufferedTupleStream3::PrepareForWrite(bool* got_reservation) { +Status BufferedTupleStream3::PrepareForWrite() { // This must be the first iterator created. DCHECK(pages_.empty()); DCHECK(!delete_on_read_); @@ -213,16 +188,11 @@ Status BufferedTupleStream3::PrepareForWrite(bool* got_reservation) { DCHECK(!has_read_iterator()); CHECK_CONSISTENCY_FULL(); - *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_); - if (!*got_reservation) return Status::OK(); has_write_iterator_ = true; - // Save reservation for the write iterators. - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - CHECK_CONSISTENCY_FULL(); return Status::OK(); } -Status BufferedTupleStream3::PrepareForReadWrite(bool delete_on_read, bool* got_reservation) { +Status BufferedTupleStream3::PrepareForReadWrite(bool delete_on_read) { // This must be the first iterator created. DCHECK(pages_.empty()); DCHECK(!delete_on_read_); @@ -230,12 +200,7 @@ Status BufferedTupleStream3::PrepareForReadWrite(bool delete_on_read, bool* got_ DCHECK(!has_read_iterator()); CHECK_CONSISTENCY_FULL(); - *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_); - if (!*got_reservation) return Status::OK(); has_write_iterator_ = true; - // Save reservation for both the read and write iterators. - buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read)); return Status::OK(); } @@ -254,8 +219,6 @@ void BufferedTupleStream3::Close(RowBatch* batch, RowBatch::FlushMode flush) { buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle); } } - read_page_reservation_.Close(); - write_page_reservation_.Close(); pages_.clear(); num_pages_ = 0; bytes_pinned_ = 0; @@ -297,62 +260,6 @@ void BufferedTupleStream3::UnpinPageIfNeeded(Page* page, bool stream_pinned) { } } -bool BufferedTupleStream3::NeedWriteReservation() const { - return NeedWriteReservation(pinned_); -} - -bool BufferedTupleStream3::NeedWriteReservation(bool stream_pinned) const { - return NeedWriteReservation(stream_pinned, num_pages_, has_write_iterator(), - write_page_ != nullptr, has_read_write_page()); -} - -bool BufferedTupleStream3::NeedWriteReservation(bool stream_pinned, int64_t num_pages, - bool has_write_iterator, bool has_write_page, - bool has_read_write_page) { - if (!has_write_iterator) return false; - // If the stream is empty the write reservation hasn't been used yet. - if (num_pages == 0) return true; - if (stream_pinned) { - // Make sure we've saved the write reservation for the next page if the only - // page is a read/write page. - return has_read_write_page && num_pages == 1; - } else { - // Make sure we've saved the write reservation if it's not being used to pin - // a page in the stream. - return !has_write_page || has_read_write_page; - } -} - -bool BufferedTupleStream3::NeedReadReservation() const { - return NeedReadReservation(pinned_); -} - -bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned) const { - return NeedReadReservation(stream_pinned, num_pages_, has_read_iterator(), - read_page_ != pages_.end()); -} - -bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned, int64_t num_pages, - bool has_read_iterator, bool has_read_page) const { - return NeedReadReservation(stream_pinned, num_pages, has_read_iterator, has_read_page, - has_write_iterator(), write_page_ != nullptr); -} - -bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned, int64_t num_pages, - bool has_read_iterator, bool has_read_page, - bool has_write_iterator, bool has_write_page) { - if (!has_read_iterator) return false; - if (stream_pinned) { - // Need reservation if there are no pages currently pinned for reading but we may add - // a page. - return num_pages == 0 && has_write_iterator; - } else { - // Only need to save reservation for an unpinned stream if there is no read page - // and we may advance to one in the future. - return (has_write_iterator || num_pages > 0) && !has_read_page; - } -} - Status BufferedTupleStream3::NewWritePage(int64_t page_len) noexcept { DCHECK(!closed_); DCHECK(write_page_ == nullptr); @@ -377,59 +284,19 @@ void BufferedTupleStream3::CalcPageLenForRow(int64_t row_size, int64_t* page_len *page_len = std::max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size)); } -Status BufferedTupleStream3::AdvanceWritePage(int64_t row_size, bool* got_reservation) noexcept { +Status BufferedTupleStream3::AdvanceWritePage(int64_t row_size) noexcept { DCHECK(has_write_iterator()); CHECK_CONSISTENCY_FAST(); int64_t page_len; CalcPageLenForRow(row_size, &page_len); - - // Reservation may have been saved for the next write page, e.g. by PrepareForWrite() - // if the stream is empty. - int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0; - if (NeedWriteReservation(pinned_, num_pages_, true, write_page_ != nullptr, - has_read_write_page()) && - !NeedWriteReservation(pinned_, num_pages_ + 1, true, true, false)) { - write_reservation_to_restore = default_page_len_; - } - // If the stream is pinned, we need to keep the previous write page pinned for reading. - // Check if we saved reservation for this case. - if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), read_page_ != pages_.end(), - true, write_page_ != nullptr) && - !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(), - read_page_ != pages_.end(), true, true)) { - read_reservation_to_restore = default_page_len_; - } - - // We may reclaim reservation by unpinning a page that was pinned for writing. - int64_t write_page_reservation_to_reclaim = - (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ? write_page_->len() : 0; - // Check to see if we can get the reservation before changing the state of the stream. - if (!buffer_pool_client_->IncreaseReservationToFit(page_len - write_reservation_to_restore - - read_reservation_to_restore - - write_page_reservation_to_reclaim)) { - DCHECK(pinned_ || page_len > default_page_len_) - << "If the stream is unpinned, this should only fail for large pages"; - CHECK_CONSISTENCY_FAST(); - *got_reservation = false; - return Status::OK(); - } - if (write_reservation_to_restore > 0) { - buffer_pool_client_->RestoreReservation(&write_page_reservation_, - write_reservation_to_restore); - } - if (read_reservation_to_restore > 0) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, - read_reservation_to_restore); - } ResetWritePage(); //RETURN_IF_ERROR(NewWritePage(page_len)); Status status = NewWritePage(page_len); if (UNLIKELY(!status.ok())) { return status; } - *got_reservation = true; return Status::OK(); } @@ -450,15 +317,6 @@ void BufferedTupleStream3::InvalidateWriteIterator() { if (!has_write_iterator()) return; ResetWritePage(); has_write_iterator_ = false; - // No more pages will be appended to stream - do not need any write reservation. - write_page_reservation_.Close(); - // May not need a read reservation once the write iterator is invalidated. - if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), read_page_ != pages_.end(), - true, write_page_ != nullptr) && - !NeedReadReservation(pinned_, num_pages_, has_read_iterator(), read_page_ != pages_.end(), - false, false)) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } } Status BufferedTupleStream3::NextReadPage() { @@ -470,10 +328,6 @@ Status BufferedTupleStream3::NextReadPage() { // No rows read yet - start reading at first page. If the stream is unpinned, we can // use the reservation saved in PrepareForReadWrite() to pin the first page. read_page_ = pages_.begin(); - if (NeedReadReservation(pinned_, num_pages_, true, false) && - !NeedReadReservation(pinned_, num_pages_, true, true)) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } } else if (delete_on_read_) { DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " << DebugString(); DCHECK_NE(&*read_page_, write_page_); @@ -494,18 +348,6 @@ Status BufferedTupleStream3::NextReadPage() { return Status::OK(); } - if (!pinned_ && read_page_->len() > default_page_len_ && - buffer_pool_client_->GetUnusedReservation() < read_page_->len()) { - // If we are iterating over an unpinned stream and encounter a page that is larger - // than the default page length, then unpinning the previous page may not have - // freed up enough reservation to pin the next one. The client is responsible for - // ensuring the reservation is available, so this indicates a bug. - std::stringstream err_stream; - err_stream << "Internal error: couldn't pin large page of " << read_page_->len() - << " bytes, client only had " << buffer_pool_client_->GetUnusedReservation() - << " bytes of unused reservation:" << buffer_pool_client_->DebugString() << "\n"; - return Status::InternalError(err_stream.str()); - } // Ensure the next page is pinned for reading. By this point we should have enough // reservation to pin the page. If the stream is pinned, the page is already pinned. // If the stream is unpinned, we freed up enough memory for a default-sized page by @@ -521,14 +363,6 @@ Status BufferedTupleStream3::NextReadPage() { read_ptr_ = read_buffer->data(); read_end_ptr_ = read_ptr_ + read_buffer->len(); - // We may need to save reservation for the write page in the case when the write page - // became a read/write page. - if (!NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), write_page_ != nullptr, - false) && - NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), write_page_ != nullptr, - has_read_write_page())) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } CHECK_CONSISTENCY_FAST(); return Status::OK(); } @@ -545,22 +379,15 @@ void BufferedTupleStream3::InvalidateReadIterator() { UnpinPageIfNeeded(prev_read_page, pinned_); } has_read_iterator_ = false; - if (read_page_reservation_.GetReservation() > 0) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } // It is safe to re-read a delete-on-read stream if no rows were read and no pages // were therefore deleted. if (rows_returned_ == 0) delete_on_read_ = false; } -Status BufferedTupleStream3::PrepareForRead(bool delete_on_read, bool* got_reservation) { +Status BufferedTupleStream3::PrepareForRead(bool delete_on_read) { CHECK_CONSISTENCY_FULL(); InvalidateWriteIterator(); InvalidateReadIterator(); - // If already pinned, no additional pin is needed (see ExpectedPinCount()). - *got_reservation = pinned_ || pages_.empty() || - buffer_pool_client_->IncreaseReservationToFit(default_page_len_); - if (!*got_reservation) return Status::OK(); return PrepareForReadInternal(delete_on_read); } @@ -603,28 +430,6 @@ Status BufferedTupleStream3::PinStream(bool* pinned) { return Status::OK(); } *pinned = false; - // First, make sure we have the reservation to pin all the pages for reading. - int64_t bytes_to_pin = 0; - for (Page& page : pages_) { - bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len(); - } - - // Check if we have some reservation to restore. - bool restore_write_reservation = NeedWriteReservation(false) && !NeedWriteReservation(true); - bool restore_read_reservation = NeedReadReservation(false) && !NeedReadReservation(true); - int64_t increase_needed = bytes_to_pin - (restore_write_reservation ? default_page_len_ : 0) - - (restore_read_reservation ? default_page_len_ : 0); - bool reservation_granted = buffer_pool_client_->IncreaseReservationToFit(increase_needed); - if (!reservation_granted) return Status::OK(); - - // If there is no current write page we should have some saved reservation to use. - // Only continue saving it if the stream is empty and need it to pin the first page. - if (restore_write_reservation) { - buffer_pool_client_->RestoreReservation(&write_page_reservation_, default_page_len_); - } - if (restore_read_reservation) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } // At this point success is guaranteed - go through to pin the pages we need to pin. // If the page data was evicted from memory, the read I/O can happen in parallel @@ -652,13 +457,6 @@ void BufferedTupleStream3::UnpinStream(UnpinMode mode) { // be unpinned at this point. for (Page& page : pages_) UnpinPageIfNeeded(&page, false); - // Check to see if we need to save some of the reservation we freed up. - if (!NeedWriteReservation(true) && NeedWriteReservation(false)) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } - if (!NeedReadReservation(true) && NeedReadReservation(false)) { - buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); - } pinned_ = false; } CHECK_CONSISTENCY_FULL(); @@ -674,9 +472,7 @@ Status BufferedTupleStream3::GetRows(std::unique_ptr* batch, bool* got } RETURN_IF_ERROR(PinStream(got_rows)); if (!*got_rows) return Status::OK(); - bool got_reservation; - RETURN_IF_ERROR(PrepareForRead(false, &got_reservation)); - DCHECK(got_reservation) << "Stream was pinned"; + RETURN_IF_ERROR(PrepareForRead(false)); // TODO chenhao // capacity in RowBatch use int, but _num_rows is int64_t @@ -886,9 +682,8 @@ bool BufferedTupleStream3::AddRowSlow(TupleRow* row, Status* status) noexcept { } uint8_t* BufferedTupleStream3::AddRowCustomBeginSlow(int64_t size, Status* status) noexcept { - bool got_reservation = false; - *status = AdvanceWritePage(size, &got_reservation); - if (!status->ok() || !got_reservation) { + *status = AdvanceWritePage(size); + if (!status->ok()) { return nullptr; } // We have a large-enough page so now success is guaranteed. @@ -902,11 +697,6 @@ void BufferedTupleStream3::AddLargeRowCustomEnd(int64_t size) noexcept { // Immediately unpin the large write page so that we're not using up extra reservation // and so we don't append another row to the page. ResetWritePage(); - // Save some of the reservation we freed up so we can create the next write page when - // needed. - if (NeedWriteReservation()) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } // The stream should be in a consistent state once the row is added. CHECK_CONSISTENCY_FAST(); } diff --git a/be/src/runtime/buffered_tuple_stream3.h b/be/src/runtime/buffered_tuple_stream3.h index 9af6d21034..6f5ba3dae4 100644 --- a/be/src/runtime/buffered_tuple_stream3.h +++ b/be/src/runtime/buffered_tuple_stream3.h @@ -228,7 +228,7 @@ public: /// 'got_reservation': set to true if there was enough reservation to initialize the /// first write page and false if there was not enough reservation and no other /// error was encountered. Undefined if an error status is returned. - Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT; + Status PrepareForWrite() WARN_UNUSED_RESULT; /// Prepares the stream for interleaved reads and writes by saving enough reservation /// for default-sized read and write pages. Called after Init() and before the first @@ -237,7 +237,7 @@ public: /// 'got_reservation': set to true if there was enough reservation to initialize the /// read and write pages and false if there was not enough reservation and no other /// error was encountered. Undefined if an error status is returned. - Status PrepareForReadWrite(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + Status PrepareForReadWrite(bool delete_on_read) WARN_UNUSED_RESULT; /// Prepares the stream for reading, invalidating the write iterator (if there is one). /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before @@ -248,7 +248,7 @@ public: /// 'got_reservation': set to true if there was enough reservation to initialize the /// first read page and false if there was not enough reservation and no other /// error was encountered. Undefined if an error status is returned. - Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + Status PrepareForRead(bool delete_on_read) WARN_UNUSED_RESULT; /// Adds a single row to the stream. There are three possible outcomes: /// a) The append succeeds. True is returned. @@ -447,11 +447,6 @@ private: /// status was returned. std::list::iterator read_page_; - /// Saved reservation for read iterator. 'default_page_len_' reservation is saved if - /// there is a read iterator, no pinned read page, and the possibility that the read - /// iterator will advance to a valid page. - BufferPool::SubReservation read_page_reservation_; - /// Number of rows returned from the current read_page_. uint32_t read_page_rows_returned_; @@ -479,15 +474,6 @@ private: /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd(). Page* write_page_; - /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if - /// there is a write iterator, no page currently pinned for writing and the possibility - /// that a pin count will be needed for the write iterator in future. Specifically if: - /// * no rows have been appended to the stream and 'pages_' is empty, or - /// * the stream is unpinned, 'write_page_' is null and and the last page in 'pages_' - /// is a large page that we advanced past, or - /// * there is only one pinned page in the stream and it is already pinned for reading. - BufferPool::SubReservation write_page_reservation_; - /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list /// to compute it. int64_t bytes_pinned_; @@ -570,7 +556,7 @@ private: /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets /// 'got_reservation' to false if the reservation could not be increased and no other /// error was encountered. - Status AdvanceWritePage(int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT; + Status AdvanceWritePage(int64_t row_size) noexcept WARN_UNUSED_RESULT; /// Reset the write page, if there is one, and unpin pages accordingly. If there /// is an active write iterator, the next row will be appended to a new page. @@ -618,36 +604,6 @@ private: /// read and write pages and whether the stream is pinned. int ExpectedPinCount(bool stream_pinned, const Page* page) const; - /// Return true if the stream in its current state needs to have a reservation for - /// a write page stored in 'write_page_reservation_'. - bool NeedWriteReservation() const; - - /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. - bool NeedWriteReservation(bool stream_pinned) const; - - /// Same as above, except assume the stream has 'num_pages' pages and different - /// iterator state. - static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages, bool has_write_iterator, - bool has_write_page, bool has_read_write_page); - - /// Return true if the stream in its current state needs to have a reservation for - /// a read page stored in 'read_page_reservation_'. - bool NeedReadReservation() const; - - /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. - bool NeedReadReservation(bool stream_pinned) const; - - /// Same as above, except assume the stream has 'num_pages' pages and a different - /// read iterator state. - bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool has_read_iterator, - bool has_read_page) const; - - /// Same as above, except assume the stream has 'num_pages' pages and a different - /// write iterator state. - static bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool has_read_iterator, - bool has_read_page, bool has_write_iterator, - bool has_write_page); - /// Templated GetNext implementations. template Status GetNextInternal(RowBatch* batch, bool* eos, std::vector* flat_rows); diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index eda74e3530..26c43f2b9e 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -114,13 +114,11 @@ BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit, BufferPool::~BufferPool() {} -Status BufferPool::RegisterClient(const string& name, ReservationTracker* parent_reservation, - int64_t reservation_limit, RuntimeProfile* profile, +Status BufferPool::RegisterClient(const string& name, RuntimeProfile* profile, ClientHandle* client) { DCHECK(!client->is_registered()); - DCHECK(parent_reservation != nullptr); client->impl_ = new Client(this, //file_group, - name, parent_reservation, reservation_limit, profile); + name, profile); return Status::OK(); } @@ -179,7 +177,6 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) { } // Update accounting last to avoid complicating the error return path above. ++page->pin_count; - client->impl_->reservation()->AllocateFrom(page->len); return Status::OK(); } @@ -190,8 +187,6 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) { // If handle is pinned, we can assume that the page itself is pinned. DCHECK(handle->is_pinned()); Page* page = handle->page_; - ReservationTracker* reservation = client->impl_->reservation(); - reservation->ReleaseTo(page->len); if (--page->pin_count > 0) return; //if (page->pin_in_flight) { @@ -249,8 +244,6 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src, DCHECK_NE(src, dst); DCHECK_NE(src_client, dst_client); - dst_client->impl_->reservation()->AllocateFrom(src->len()); - src_client->impl_->reservation()->ReleaseTo(src->len()); *dst = std::move(*src); dst->client_ = dst_client; return Status::OK(); @@ -292,82 +285,12 @@ int64_t BufferPool::GetFreeBufferBytes() const { return allocator_->GetFreeBufferBytes(); } -bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) { - return impl_->reservation()->IncreaseReservation(bytes); -} - -bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) { - return impl_->reservation()->IncreaseReservationToFit(bytes); -} - -Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) { - return impl_->DecreaseReservationTo(target_bytes); -} - -int64_t BufferPool::ClientHandle::GetReservation() const { - return impl_->reservation()->GetReservation(); -} - -int64_t BufferPool::ClientHandle::GetUsedReservation() const { - return impl_->reservation()->GetUsedReservation(); -} - -int64_t BufferPool::ClientHandle::GetUnusedReservation() const { - return impl_->reservation()->GetUnusedReservation(); -} - -bool BufferPool::ClientHandle::TransferReservationFrom(ReservationTracker* src, int64_t bytes) { - return src->TransferReservationTo(impl_->reservation(), bytes); -} - -bool BufferPool::ClientHandle::TransferReservationTo(ReservationTracker* dst, int64_t bytes) { - return impl_->reservation()->TransferReservationTo(dst, bytes); -} - -void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) { - DCHECK_EQ(dst->tracker_->parent(), impl_->reservation()); - bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes); - DCHECK(success); // SubReservation should not have a limit, so this shouldn't fail. -} - -void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t bytes) { - DCHECK_EQ(src->tracker_->parent(), impl_->reservation()); - bool success = src->tracker_->TransferReservationTo(impl_->reservation(), bytes); - DCHECK(success); // Transferring reservation to parent shouldn't fail. -} - -void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probability) { - impl_->reservation()->SetDebugDenyIncreaseReservation(probability); -} - bool BufferPool::ClientHandle::has_unpinned_pages() const { return impl_->has_unpinned_pages(); } -BufferPool::SubReservation::SubReservation(ClientHandle* client) { - tracker_.reset(new ReservationTracker); - tracker_->InitChildTracker(nullptr, client->impl_->reservation(), - numeric_limits::max()); -} - -BufferPool::SubReservation::~SubReservation() {} - -int64_t BufferPool::SubReservation::GetReservation() const { - return tracker_->GetReservation(); -} - -void BufferPool::SubReservation::Close() { - // Give any reservation back to the client. - if (is_closed()) return; - bool success = tracker_->TransferReservationTo(tracker_->parent(), tracker_->GetReservation()); - DCHECK(success); // Transferring reservation to parent shouldn't fail. - tracker_->Close(); - tracker_.reset(); -} - BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group, - const string& name, ReservationTracker* parent_reservation, - int64_t reservation_limit, RuntimeProfile* profile) + const string& name, RuntimeProfile* profile) : pool_(pool), //file_group_(file_group), name_(name), @@ -376,7 +299,6 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group buffers_allocated_bytes_(0) { // Set up a child profile with buffer pool info. RuntimeProfile* child_profile = profile->create_child("Buffer pool", true, true); - reservation_.InitChildTracker(child_profile, parent_reservation, reservation_limit); counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime"); counters_.cumulative_allocations = ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); @@ -544,25 +466,11 @@ Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) { // Clean enough pages to allow allocation to proceed without violating our eviction // policy. This can fail, so only update the accounting once success is ensured. //RETURN_IF_ERROR(CleanPages(&lock, len)); - reservation_.AllocateFrom(len); buffers_allocated_bytes_ += len; DCHECK_CONSISTENCY(); return Status::OK(); } -Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) { - std::unique_lock lock(lock_); - int64_t current_reservation = reservation_.GetReservation(); - DCHECK_GE(current_reservation, target_bytes); - int64_t amount_to_free = - std::min(reservation_.GetUnusedReservation(), current_reservation - target_bytes); - if (amount_to_free == 0) return Status::OK(); - // Clean enough pages to allow us to safely release reservation. - //RETURN_IF_ERROR(CleanPages(&lock, amount_to_free)); - reservation_.DecreaseReservation(amount_to_free); - return Status::OK(); -} - Status BufferPool::Client::CleanPages(std::unique_lock* client_lock, int64_t len) { DCheckHoldsLock(*client_lock); DCHECK_CONSISTENCY(); @@ -693,8 +601,7 @@ string BufferPool::Client::DebugString() { << buffers_allocated_bytes_ << " num_pages: " << num_pages_ << " pinned_bytes: " << pinned_pages_.bytes() << " dirty_unpinned_bytes: " << dirty_unpinned_pages_.bytes() - << " in_flight_write_bytes: " << in_flight_write_pages_.bytes() - << " reservation: " << reservation_.DebugString(); + << " in_flight_write_bytes: " << in_flight_write_pages_.bytes(); ss << "\n " << pinned_pages_.size() << " pinned pages: "; pinned_pages_.iterate(std::bind(Page::DebugStringCallback, &ss, std::placeholders::_1)); ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: "; diff --git a/be/src/runtime/bufferpool/buffer_pool.h b/be/src/runtime/bufferpool/buffer_pool.h index 9a378934a2..469f5071db 100644 --- a/be/src/runtime/bufferpool/buffer_pool.h +++ b/be/src/runtime/bufferpool/buffer_pool.h @@ -34,7 +34,6 @@ namespace doris { -class ReservationTracker; class RuntimeProfile; class SystemAllocator; class MemTracker; @@ -149,7 +148,6 @@ public: class BufferHandle; class ClientHandle; class PageHandle; - class SubReservation; /// Constructs a new buffer pool. /// 'min_buffer_len': the minimum buffer length for the pool. Must be a power of two. /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the @@ -167,12 +165,7 @@ public: /// not allowed for this client. Counters for this client are added to the (non-nullptr) /// 'profile'. 'client' is the client to register. 'client' must not already be /// registered. - /// - /// The client's reservation is created as a child of 'parent_reservation' with limit - /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial - /// reservation is 0 bytes. - Status RegisterClient(const std::string& name, ReservationTracker* parent_reservation, - int64_t reservation_limit, RuntimeProfile* profile, + Status RegisterClient(const std::string& name, RuntimeProfile* profile, ClientHandle* client) WARN_UNUSED_RESULT; /// Deregister 'client' if it is registered. All pages must be destroyed and buffers @@ -315,49 +308,6 @@ public: /// Client must be deregistered. ~ClientHandle() { DCHECK(!is_registered()); } - /// Request to increase reservation for this client by 'bytes' by calling - /// ReservationTracker::IncreaseReservation(). Returns true if the reservation was - /// successfully increased. - bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT; - - /// Tries to ensure that 'bytes' of unused reservation is available for this client - /// to use by calling ReservationTracker::IncreaseReservationToFit(). Returns true - /// if successful, after which 'bytes' can be used. - bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; - - /// Try to decrease this client's reservation down to a minimum of 'target_bytes' by - /// releasing unused reservation to ancestor ReservationTrackers, all the way up to - /// the root of the ReservationTracker tree. May block waiting for unpinned pages to - /// be flushed. This client's reservation must be at least 'target_bytes' before - /// calling this method. May fail if decreasing the reservation requires flushing - /// unpinned pages to disk and a write to disk fails. - Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; - - /// Move some of this client's reservation to the SubReservation. 'bytes' of unused - /// reservation must be available in this tracker. - void SaveReservation(SubReservation* dst, int64_t bytes); - - /// Move some of src's reservation to this client. 'bytes' of unused reservation must be - /// available in 'src'. - void RestoreReservation(SubReservation* src, int64_t bytes); - - /// Accessors for this client's reservation corresponding to the identically-named - /// methods in ReservationTracker. - int64_t GetReservation() const; - int64_t GetUsedReservation() const; - int64_t GetUnusedReservation() const; - - /// Try to transfer 'bytes' of reservation from 'src' to this client using - /// ReservationTracker::TransferReservationTo(). - bool TransferReservationFrom(ReservationTracker* src, int64_t bytes); - - /// Transfer 'bytes' of reservation from this client to 'dst' using - /// ReservationTracker::TransferReservationTo(). - bool TransferReservationTo(ReservationTracker* dst, int64_t bytes); - - /// Call SetDebugDenyIncreaseReservation() on this client's ReservationTracker. - void SetDebugDenyIncreaseReservation(double probability); - bool is_registered() const { return impl_ != nullptr; } /// Return true if there are any unpinned pages for this client. @@ -368,7 +318,6 @@ public: private: friend class BufferPool; friend class BufferPoolTest; - friend class SubReservation; DISALLOW_COPY_AND_ASSIGN(ClientHandle); /// Internal state for the client. nullptr means the client isn't registered. @@ -376,31 +325,6 @@ private: Client* impl_; }; -/// Helper class that allows dividing up a client's reservation into separate buckets. -class BufferPool::SubReservation { -public: - SubReservation(ClientHandle* client); - ~SubReservation(); - - /// Returns the amount of reservation stored in this sub-reservation. - int64_t GetReservation() const; - - /// Releases the sub-reservation to the client's tracker. Must be called before - /// destruction. - void Close(); - - bool is_closed() const { return tracker_ == nullptr; } - -private: - friend class BufferPool::ClientHandle; - DISALLOW_COPY_AND_ASSIGN(SubReservation); - - /// Child of the client's tracker used to track the sub-reservation. Usage is not - /// tracked against this tracker - instead the reservation is always transferred back - /// to the client's tracker before use. - std::unique_ptr tracker_; -}; - /// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only /// be used by a single thread at a time: concurrently calling BufferHandle methods or /// BufferPool methods with the BufferHandle as an argument is not supported. diff --git a/be/src/runtime/bufferpool/buffer_pool_internal.h b/be/src/runtime/bufferpool/buffer_pool_internal.h index 08549cc947..2b0a083268 100644 --- a/be/src/runtime/bufferpool/buffer_pool_internal.h +++ b/be/src/runtime/bufferpool/buffer_pool_internal.h @@ -23,7 +23,6 @@ #include "runtime/bufferpool/buffer_pool.h" #include "runtime/bufferpool/buffer_pool_counters.h" -#include "runtime/bufferpool/reservation_tracker.h" // Ensure that DCheckConsistency() function calls get removed in release builds. #ifndef NDEBUG @@ -132,16 +131,14 @@ private: class BufferPool::Client { public: Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group, - const std::string& name, ReservationTracker* parent_reservation, - int64_t reservation_limit, RuntimeProfile* profile); + const std::string& name, RuntimeProfile* profile); ~Client() { DCHECK_EQ(0, num_pages_); DCHECK_EQ(0, buffers_allocated_bytes_); } - /// Release reservation for this client. - void Close() { reservation_.Close(); } + void Close() {} /// Create a pinned page using 'buffer', which was allocated using AllocateBuffer(). /// No client or page locks should be held by the caller. @@ -181,15 +178,11 @@ public: /// client locks should be held by the caller. Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT; - /// Implementation of ClientHandle::DecreaseReservationTo(). - Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; - /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update /// internal accounting and release the buffer to the client's reservation. No page or /// client locks should be held by the caller. void FreedBuffer(int64_t len) { std::lock_guard cl(lock_); - reservation_.ReleaseTo(len); buffers_allocated_bytes_ -= len; DCHECK_CONSISTENCY(); } @@ -208,7 +201,6 @@ public: DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock()); } - ReservationTracker* reservation() { return &reservation_; } const BufferPoolClientCounters& counters() const { return counters_; } //bool spilling_enabled() const { return file_group_ != nullptr; } void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } @@ -230,10 +222,6 @@ private: DCHECK_LE( pinned_pages_.size() + dirty_unpinned_pages_.size() + in_flight_write_pages_.size(), num_pages_); - // Check that we flushed enough pages to disk given our eviction policy. - DCHECK_GE(reservation_.GetReservation(), buffers_allocated_bytes_ + pinned_pages_.bytes() + - dirty_unpinned_pages_.bytes() + - in_flight_write_pages_.bytes()); } /// Must be called once before allocating or reclaiming a buffer of 'len'. Ensures that @@ -269,10 +257,6 @@ private: /// A name identifying the client. const std::string name_; - /// The reservation tracker for the client. All pages pinned by the client count as - /// usage against 'reservation_'. - ReservationTracker reservation_; - /// The RuntimeProfile counters for this client, owned by the client's RuntimeProfile. /// All non-nullptr. BufferPoolClientCounters counters_; diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc b/be/src/runtime/bufferpool/reservation_tracker.cc deleted file mode 100644 index 6985edaef7..0000000000 --- a/be/src/runtime/bufferpool/reservation_tracker.cc +++ /dev/null @@ -1,401 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/bufferpool/reservation_tracker.h" - -#include -#include - -#include "common/object_pool.h" -#include "gutil/strings/substitute.h" -#include "olap/utils.h" -#include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/thread_context.h" -#include "util/dummy_runtime_profile.h" -#include "util/runtime_profile.h" - -namespace doris { - -ReservationTracker::ReservationTracker() {} - -ReservationTracker::~ReservationTracker() { - DCHECK(!initialized_); -} - -void ReservationTracker::InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit) { - std::lock_guard l(lock_); - DCHECK(!initialized_); - parent_ = nullptr; - mem_tracker_ = nullptr; - reservation_limit_ = reservation_limit; - reservation_ = 0; - used_reservation_ = 0; - child_reservations_ = 0; - initialized_ = true; - - InitCounters(profile, reservation_limit_); - COUNTER_SET(counters_.peak_reservation, reservation_); - - CheckConsistency(); -} - -void ReservationTracker::InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent, - int64_t reservation_limit) { - DCHECK(parent != nullptr); - DCHECK_GE(reservation_limit, 0); - - std::lock_guard l(lock_); - DCHECK(!initialized_); - parent_ = parent; - mem_tracker_ = nullptr; // TODO(zxy) remove ReservationTracker later - - reservation_limit_ = reservation_limit; - reservation_ = 0; - used_reservation_ = 0; - child_reservations_ = 0; - initialized_ = true; - - if (mem_tracker_ != nullptr) { - MemTracker* parent_mem_tracker = GetParentMemTracker(); - if (parent_mem_tracker != nullptr) { - // Make sure the parent links of the MemTrackers correspond to our parent links. - // DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent()); - } else { - // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent - // shouldn't have a MemTracker. - ReservationTracker* ancestor = parent_; - while (ancestor != nullptr) { - DCHECK(ancestor->mem_tracker_ == nullptr); - ancestor = ancestor->parent_; - } - } - } - - InitCounters(profile, reservation_limit_); - - CheckConsistency(); -} - -void ReservationTracker::InitCounters(RuntimeProfile* profile, int64_t reservation_limit) { - if (profile == nullptr) { - dummy_profile_.reset(new DummyProfile); - profile = dummy_profile_->profile(); - } - - // Check that another tracker's counters aren't already registered in the profile. - DCHECK(profile->get_counter("PeakReservation") == nullptr); - counters_.peak_reservation = profile->AddHighWaterMarkCounter("PeakReservation", TUnit::BYTES); - counters_.peak_used_reservation = - profile->AddHighWaterMarkCounter("PeakUsedReservation", TUnit::BYTES); - // Only show the limit if set. - counters_.reservation_limit = nullptr; - if (reservation_limit != numeric_limits::max()) { - counters_.reservation_limit = ADD_COUNTER(profile, "ReservationLimit", TUnit::BYTES); - COUNTER_SET(counters_.reservation_limit, reservation_limit); - } -} - -void ReservationTracker::Close() { - std::lock_guard l(lock_); - if (!initialized_) return; - CheckConsistency(); - DCHECK_EQ(used_reservation_, 0); - DCHECK_EQ(child_reservations_, 0); - // Release any reservation to parent. - if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false); - mem_tracker_ = nullptr; - parent_ = nullptr; - initialized_ = false; -} - -bool ReservationTracker::IncreaseReservation(int64_t bytes) { - std::lock_guard l(lock_); - return IncreaseReservationInternalLocked(bytes, false, false); -} - -bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) { - std::lock_guard l(lock_); - return IncreaseReservationInternalLocked(bytes, true, false); -} - -bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes, - bool use_existing_reservation, - bool is_child_reservation) { - DCHECK(initialized_); - int64_t reservation_increase = - use_existing_reservation ? std::max(0, bytes - unused_reservation()) : bytes; - DCHECK_GE(reservation_increase, 0); - - bool granted; - // Check if the increase is allowed, starting at the bottom of hierarchy. - if (reservation_increase == 0) { - granted = true; - } else if (increase_deny_probability_ != 0.0 && - rand() < increase_deny_probability_ * (RAND_MAX + 1L)) { - // Randomly deny reservation if requested. Use rand() to avoid needing to set up a RNG. - // Should be good enough. If the probability is 0.0, this never triggers. If it is 1.0 - // it always triggers. - granted = false; - } else if (reservation_ + reservation_increase > reservation_limit_) { - granted = false; - } else { - if (parent_ == nullptr) { - granted = true; - } else { - std::lock_guard l(parent_->lock_); - granted = parent_->IncreaseReservationInternalLocked(reservation_increase, true, true); - } - if (granted && !TryConsumeFromMemTracker(reservation_increase)) { - granted = false; - // Roll back changes to ancestors if MemTracker update fails. - parent_->DecreaseReservation(reservation_increase, true); - } - } - - if (granted) { - // The reservation was granted and state updated in all ancestors: we can modify - // this tracker's state now. - UpdateReservation(reservation_increase); - if (is_child_reservation) child_reservations_ += bytes; - } - - CheckConsistency(); - return granted; -} - -bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) { - DCHECK_GE(reservation_increase, 0); - if (mem_tracker_ == nullptr) return true; - if (GetParentMemTracker() == nullptr) { - // At the topmost link, which may be a MemTracker with a limit, we need to use - // TryConsume() to check the limit. - Status st = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - reservation_increase); - WARN_IF_ERROR(st, "TryConsumeFromMemTracker failed"); - mem_tracker_->consume(reservation_increase); - return st.ok(); - } else { - // For lower links, there shouldn't be a limit to enforce, so we just need to - // update the consumption of the linked MemTracker since the reservation is - // already reflected in its parent. - mem_tracker_->consume(reservation_increase); - return true; - } -} - -void ReservationTracker::ReleaseToMemTracker(int64_t reservation_decrease) { - DCHECK_GE(reservation_decrease, 0); - if (mem_tracker_ == nullptr) return; - mem_tracker_->release(reservation_decrease); -} - -void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reservation) { - std::lock_guard l(lock_); - DecreaseReservationLocked(bytes, is_child_reservation); -} - -void ReservationTracker::DecreaseReservationLocked(int64_t bytes, bool is_child_reservation) { - DCHECK(initialized_); - DCHECK_GE(reservation_, bytes); - if (bytes == 0) return; - if (is_child_reservation) child_reservations_ -= bytes; - UpdateReservation(-bytes); - ReleaseToMemTracker(bytes); - // The reservation should be returned up the tree. - if (parent_ != nullptr) parent_->DecreaseReservation(bytes, true); - CheckConsistency(); -} - -bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_t bytes) { - if (other == this) return true; - // Find the path to the root from both. The root is guaranteed to be a common ancestor. - std::vector path_to_common = FindPathToRoot(); - std::vector other_path_to_common = other->FindPathToRoot(); - DCHECK_EQ(path_to_common.back(), other_path_to_common.back()); - ReservationTracker* common_ancestor = path_to_common.back(); - // Remove any common ancestors - they do not need to be updated for this transfer. - while (!path_to_common.empty() && !other_path_to_common.empty() && - path_to_common.back() == other_path_to_common.back()) { - common_ancestor = path_to_common.back(); - path_to_common.pop_back(); - other_path_to_common.pop_back(); - } - - // At this point, we have three cases: - // 1. 'common_ancestor' == 'other'. 'other_path_to_common' is empty because 'other' is - // the lowest common ancestor. To transfer, we decrease the reservation on the - // trackers under 'other', down to 'this'. - // 2. 'common_ancestor' == 'this'. 'path_to_common' is empty because 'this' is the - // lowest common ancestor. To transfer, we increase the reservation on the trackers - // under 'this', down to 'other'. - // 3. Neither is an ancestor of the other. Both 'other_path_to_common' and - // 'path_to_common' are non-empty. We increase the reservation on trackers from - // 'other' up to one below the common ancestor (checking limits as needed) and if - // successful, decrease reservations on trackers from 'this' up to one below the - // common ancestor. - - // Lock all of the trackers so we can do the update atomically. Need to be careful to - // lock subtrees in the correct order. - std::vector> locks; - bool lock_first = - path_to_common.empty() || other_path_to_common.empty() || - lock_sibling_subtree_first(path_to_common.back(), other_path_to_common.back()); - if (lock_first) { - for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_); - } - for (ReservationTracker* tracker : other_path_to_common) { - locks.emplace_back(tracker->lock_); - } - if (!lock_first) { - for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_); - } - - // Check reservation limits will not be violated before applying any updates. - for (ReservationTracker* tracker : other_path_to_common) { - if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false; - } - - // Do the updates now that we have checked the limits. We're holding all the locks - // so this is all atomic. - for (ReservationTracker* tracker : other_path_to_common) { - tracker->UpdateReservation(bytes); - bool success = tracker->TryConsumeFromMemTracker(bytes); - DCHECK(success); - if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes; - } - - for (ReservationTracker* tracker : path_to_common) { - if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes; - tracker->UpdateReservation(-bytes); - tracker->ReleaseToMemTracker(bytes); - } - - // Update the 'child_reservations_' on the common ancestor if needed. - // Case 1: reservation was pushed up to 'other'. - if (common_ancestor == other) { - std::lock_guard l(other->lock_); - other->child_reservations_ -= bytes; - other->CheckConsistency(); - } - // Case 2: reservation was pushed down below 'this'. - if (common_ancestor == this) { - std::lock_guard l(lock_); - child_reservations_ += bytes; - CheckConsistency(); - } - return true; -} - -std::vector ReservationTracker::FindPathToRoot() { - std::vector path_to_root; - ReservationTracker* curr = this; - do { - path_to_root.push_back(curr); - curr = curr->parent_; - } while (curr != nullptr); - return path_to_root; -} - -void ReservationTracker::AllocateFrom(int64_t bytes) { - std::lock_guard l(lock_); - DCHECK(initialized_); - DCHECK_GE(bytes, 0); - DCHECK_LE(bytes, unused_reservation()); - UpdateUsedReservation(bytes); - CheckConsistency(); -} - -void ReservationTracker::ReleaseTo(int64_t bytes) { - std::lock_guard l(lock_); - DCHECK(initialized_); - DCHECK_GE(bytes, 0); - DCHECK_LE(bytes, used_reservation_); - UpdateUsedReservation(-bytes); - CheckConsistency(); -} - -int64_t ReservationTracker::GetReservation() { - std::lock_guard l(lock_); - DCHECK(initialized_); - return reservation_; -} - -int64_t ReservationTracker::GetUsedReservation() { - std::lock_guard l(lock_); - DCHECK(initialized_); - return used_reservation_; -} - -int64_t ReservationTracker::GetUnusedReservation() { - std::lock_guard l(lock_); - DCHECK(initialized_); - return unused_reservation(); -} - -int64_t ReservationTracker::GetChildReservations() { - std::lock_guard l(lock_); - DCHECK(initialized_); - return child_reservations_; -} - -void ReservationTracker::CheckConsistency() const { - // Check internal invariants. - DCHECK_GE(reservation_, 0); - DCHECK_LE(reservation_, reservation_limit_); - DCHECK_GE(child_reservations_, 0); - DCHECK_GE(used_reservation_, 0); - DCHECK_LE(used_reservation_ + child_reservations_, reservation_); - - DCHECK_EQ(reservation_, counters_.peak_reservation->current_value()); - DCHECK_LE(reservation_, counters_.peak_reservation->value()); - DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value()); - DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value()); - if (counters_.reservation_limit != nullptr) { - DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value()); - } -} - -void ReservationTracker::UpdateUsedReservation(int64_t delta) { - used_reservation_ += delta; - COUNTER_SET(counters_.peak_used_reservation, used_reservation_); - VLOG_QUERY << "peak:" << counters_.peak_reservation->current_value() - << " used reservation:" << reservation_; - CheckConsistency(); -} - -void ReservationTracker::UpdateReservation(int64_t delta) { - reservation_ += delta; - //LOG(INFO) << "chenhao tracker:" << tracker_name_ << " reservation:" << reservation_ - // << " delta:" << delta << " limit:" << reservation_limit_; - COUNTER_SET(counters_.peak_reservation, reservation_); - counters_.peak_reservation->set(reservation_); - CheckConsistency(); -} - -std::string ReservationTracker::DebugString() { - //std::lock_guard l(lock_); - if (!initialized_) return ": uninitialized"; - - std::string parent_debug_string = parent_ == nullptr ? "NULL" : parent_->DebugString(); - std::stringstream ss; - ss << ": reservation_limit " << reservation_limit_ << " reservation " - << reservation_ << " used_reservation " << used_reservation_ << " child_reservations " - << child_reservations_ << " parent:\n" - << parent_debug_string; - return ss.str(); -} -} // namespace doris diff --git a/be/src/runtime/bufferpool/reservation_tracker.h b/be/src/runtime/bufferpool/reservation_tracker.h deleted file mode 100644 index 80408aa6eb..0000000000 --- a/be/src/runtime/bufferpool/reservation_tracker.h +++ /dev/null @@ -1,290 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include - -#include "common/status.h" -#include "runtime/bufferpool/reservation_tracker_counters.h" -#include "util/spinlock.h" - -namespace doris { - -class DummyProfile; -class MemTracker; -class RuntimeProfile; - -/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes. -/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool -/// memory and enforcing upper and lower bounds on memory usage. -/// -/// The root of the tracker tree enforces a global maximum, which is distributed among its -/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool -/// memory it is entitled to use. The reservation is inclusive of any memory that is -/// already allocated from the reservation, i.e. using a reservation to allocate memory -/// does not subtract from the reservation. -/// -/// A reservation can be used directly at the tracker by calling AllocateFrom(), or -/// distributed to children of the tracker for the childrens' reservations. Each tracker -/// in the tree can use up to its reservation without checking parent trackers. To -/// increase its reservation, a tracker must use some of its parent's reservation (and -/// perhaps increase reservations all the way to the root of the tree). -/// -/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the -/// tracker hierarchy is the global tracker for the Impala daemon and the next level of -/// the hierarchy is made up of per-query trackers, then the maximum reservation -/// mechanism can enforce both process-level and query-level limits on reservations. -/// -/// Invariants: -/// * A tracker's reservation is at most its reservation limit: reservation <= limit -/// * A tracker's reservation is at least the sum of its childrens' reservations plus -/// the amount of the reservation used directly at this tracker. The difference is -/// the unused reservation: -/// child_reservations + used_reservation + unused_reservation = reservation. -/// -/// Thread-safety: -/// All public ReservationTracker methods are thread-safe. If multiple threads -/// concurrently invoke methods on a ReservationTracker, each operation is applied -/// atomically to leave the ReservationTracker in a consistent state. Calling threads -/// are responsible for coordinating to avoid violating any method preconditions, -/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo(). -/// -/// Integration with MemTracker hierarchy: -/// TODO: we will remove MemTracker and this integration once all memory is accounted via -/// reservations. -/// -/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec -/// node's ReservationTracker can be linked with the exec node's MemTracker, so that -/// reservations are included in query memory consumption for the purposes of enforcing -/// memory limits, reporting and logging. The reservation is accounted as consumption -/// against the linked MemTracker and its ancestors because reserved memory is committed. -/// Allocating from a reservation therefore does not change the consumption reflected in -/// the MemTracker hierarchy. -/// -/// MemTracker limits are only checked via the topmost link (i.e. the query-level -/// trackers): we require that no MemTrackers below this level have limits. -/// -/// We require that the MemTracker hierarchy is consistent with the ReservationTracker -/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent -/// is linked to a MemTracker "B", then "B" must be the parent of "A"'. -class ReservationTracker { -public: - ReservationTracker(); - virtual ~ReservationTracker(); - - /// Initializes the root tracker with the given reservation limit in bytes. The initial - /// reservation is 0. - /// if 'profile' is not nullptr, the counters defined in ReservationTrackerCounters are - /// added to 'profile'. - void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit); - - /// Initializes a new ReservationTracker with a parent. - /// If 'mem_tracker' is not nullptr, reservations for this ReservationTracker and its - /// children will be counted as consumption against 'mem_tracker'. - /// 'reservation_limit' is the maximum reservation for this tracker in bytes. - /// if 'profile' is not nullptr, the counters in 'counters_' are added to 'profile'. - void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent, - int64_t reservation_limit); - - /// If the tracker is initialized, deregister the ReservationTracker from its parent, - /// relinquishing all this tracker's reservation. All of the reservation must be unused - /// and all the tracker's children must be closed before calling this method. - /// TODO: decide on and implement policy for how far to release the reservation up - /// the tree. Currently the reservation is released all the way to the root. - void Close(); - - /// Request to increase reservation by 'bytes'. The request is either granted in - /// full or not at all. Uses any unused reservation on ancestors and increase - /// ancestors' reservations if needed to fit the increased reservation. - /// Returns true if the reservation increase is granted, or false if not granted. - /// If the reservation is not granted, no modifications are made to the state of - /// any ReservationTrackers. - bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT; - - /// Tries to ensure that 'bytes' of unused reservation is available. If not already - /// available, tries to increase the reservation such that the unused reservation is - /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase - /// ancestors' reservations if needed to fit the increased reservation. - /// Returns true if the reservation increase was successful or not necessary. - bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; - - /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's - /// reservation must be at least 'bytes' before calling this method. - void DecreaseReservation(int64_t bytes) { DecreaseReservation(bytes, false); } - - /// Transfer reservation from this tracker to 'other'. Both trackers must be in the - /// same query subtree of the hierarchy. One tracker can be the ancestor of the other, - /// or they can share a common ancestor. The subtree root must be at the query level - /// or below so that the transfer cannot cause a MemTracker limit to be exceeded - /// (because linked MemTrackers with limits below the query level are not supported). - /// Returns true on success or false if the transfer would have caused a reservation - /// limit to be exceeded. - bool TransferReservationTo(ReservationTracker* other, int64_t bytes) WARN_UNUSED_RESULT; - - /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes' - /// unused reservation before calling this method. - void AllocateFrom(int64_t bytes); - - /// Release 'bytes' of previously allocated memory. The used reservation is - /// decreased by 'bytes'. Before the call, the used reservation must be at least - /// 'bytes' before calling this method. - void ReleaseTo(int64_t bytes); - - /// Returns the amount of the reservation in bytes. - int64_t GetReservation(); - - /// Returns the current amount of the reservation used at this tracker, not including - /// reservations of children in bytes. - int64_t GetUsedReservation(); - - /// Returns the amount of the reservation neither used nor given to childrens' - /// reservations at this tracker in bytes. - int64_t GetUnusedReservation(); - - /// Returns the total reservations of children in bytes. - int64_t GetChildReservations(); - - /// Support for debug actions: deny reservation increase with probability 'probability'. - void SetDebugDenyIncreaseReservation(double probability) { - increase_deny_probability_ = probability; - } - - ReservationTracker* parent() const { return parent_; } - - std::string DebugString(); - -private: - /// Returns the amount of 'reservation_' that is unused. - int64_t unused_reservation() const { - return reservation_ - used_reservation_ - child_reservations_; - } - - /// Returns the parent's memtracker if 'parent_' is non-nullptr, or nullptr otherwise. - MemTracker* GetParentMemTracker() const { - return parent_ == nullptr ? nullptr : parent_->mem_tracker_; - } - - /// Initializes 'counters_', storing the counters in 'profile'. - /// If 'profile' is nullptr, creates a dummy profile to store the counters. - void InitCounters(RuntimeProfile* profile, int64_t max_reservation); - - /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true, - /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise - /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase - /// 'child_reservations_' by 'bytes'. - /// 'lock_' must be held by caller. - bool IncreaseReservationInternalLocked(int64_t bytes, bool use_existing_reservation, - bool is_child_reservation); - - /// Increase consumption on linked MemTracker to reflect an increase in reservation - /// of 'reservation_increase'. For the topmost link, return false if this failed - /// because it would exceed a memory limit. If there is no linked MemTracker, just - /// returns true. - /// TODO: remove once we account all memory via ReservationTrackers. - bool TryConsumeFromMemTracker(int64_t reservation_increase); - - /// Decrease consumption on linked MemTracker to reflect a decrease in reservation of - /// 'reservation_decrease'. If there is no linked MemTracker, does nothing. - /// TODO: remove once we account all memory via ReservationTrackers. - void ReleaseToMemTracker(int64_t reservation_decrease); - - /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's - /// reservation must be at least 'bytes' before calling this method. If - /// 'is_child_reservation' is true it decreases 'child_reservations_' by 'bytes' - void DecreaseReservation(int64_t bytes, bool is_child_reservation); - - /// Same as DecreaseReservation(), but 'lock_' must be held by caller. - void DecreaseReservationLocked(int64_t bytes, bool is_child_reservation); - - /// Return a vector containing the trackers on the path to the root tracker. Includes - /// the current tracker and the root tracker. - std::vector FindPathToRoot(); - - /// Return true if trackers in the subtree rooted at 'subtree1' precede trackers in - /// the subtree rooted at 'subtree2' in the lock order. 'subtree1' and 'subtree2' - /// must share the same parent. - static bool lock_sibling_subtree_first(ReservationTracker* subtree1, - ReservationTracker* subtree2) { - DCHECK_EQ(subtree1->parent_, subtree2->parent_); - return reinterpret_cast(subtree1) < reinterpret_cast(subtree2); - } - - /// Check the internal consistency of the ReservationTracker and DCHECKs if in an - /// inconsistent state. - /// 'lock_' must be held by caller. - void CheckConsistency() const; - - /// Increase or decrease 'used_reservation_' and update profile counters accordingly. - /// 'lock_' must be held by caller. - void UpdateUsedReservation(int64_t delta); - - /// Increase or decrease 'reservation_' and update profile counters accordingly. - /// 'lock_' must be held by caller. - void UpdateReservation(int64_t delta); - - /// Support for debug actions: see SetDebugDenyIncreaseReservation() for behaviour. - double increase_deny_probability_ = 0.0; - - /// lock_ protects all below members. The lock order in a tree of ReservationTrackers is - /// based on a post-order traversal of the tree, with children visited in order of the - /// memory address of the ReservationTracker object. The following rules can be applied - /// to determine the relative positions of two trackers t1 and t2 in the lock order: - /// * If t1 is a descendent of t2, t1's lock must be acquired before t2's lock (i.e. - /// locks are acquired bottom-up). - /// * If neither t1 or t2 is a descendant of the other, they must be in subtrees of - /// under a common ancestor. If the memory address of t1's subtree's root is less - /// than the memory address of t2's subtree's root, t1's lock must be acquired before - /// t2's lock. This check is implemented in lock_sibling_subtree_first(). - SpinLock lock_; - - /// True if the tracker is initialized. - bool initialized_ = false; - - /// A dummy profile to hold the counters in 'counters_' in the case that no profile - /// is provided. - std::unique_ptr dummy_profile_; - - /// The RuntimeProfile counters for this tracker. - /// All non-nullptr if 'initialized_' is true. - ReservationTrackerCounters counters_; - - /// The parent of this tracker in the hierarchy. Does not change after initialization. - ReservationTracker* parent_ = nullptr; - - /// If non-nullptr, reservations are counted as memory consumption against this tracker. - /// Does not change after initialization. Not owned. - /// TODO: remove once all memory is accounted via ReservationTrackers. - MemTracker* mem_tracker_ = nullptr; - - /// The maximum reservation in bytes that this tracker can have. - int64_t reservation_limit_; - - /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'. - int64_t reservation_; - - /// Total reservation of children in bytes. This is included in 'reservation_'. - /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. - int64_t child_reservations_; - - /// The amount of the reservation currently used by this tracker in bytes. - /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. - int64_t used_reservation_; -}; -} // namespace doris diff --git a/be/src/runtime/bufferpool/reservation_tracker_counters.h b/be/src/runtime/bufferpool/reservation_tracker_counters.h deleted file mode 100644 index 4383c6819c..0000000000 --- a/be/src/runtime/bufferpool/reservation_tracker_counters.h +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "util/runtime_profile.h" - -namespace doris { - -/// A set of counters for each ReservationTracker for reporting purposes. -/// -/// If the ReservationTracker is linked to a profile these have the same lifetime as that -/// profile, otherwise they have the same lifetime as the ReservationTracker itself. -struct ReservationTrackerCounters { - /// The tracker's peak reservation in bytes. - RuntimeProfile::HighWaterMarkCounter* peak_reservation; - - /// The tracker's peak usage in bytes. - RuntimeProfile::HighWaterMarkCounter* peak_used_reservation; - - /// The hard limit on the tracker's reservations - RuntimeProfile::Counter* reservation_limit; -}; -} // namespace doris diff --git a/be/src/runtime/bufferpool/reservation_util.cc b/be/src/runtime/bufferpool/reservation_util.cc deleted file mode 100644 index 2f7350c3ea..0000000000 --- a/be/src/runtime/bufferpool/reservation_util.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/bufferpool/reservation_util.h" - -#include - -namespace doris { - -// Most operators that accumulate memory use reservations, so the majority of memory -// should be allocated to buffer reservations, as a heuristic. -const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8; -const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024; - -int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) { - int64_t max_reservation = std::min(RESERVATION_MEM_FRACTION * mem_limit, - mem_limit - RESERVATION_MEM_MIN_REMAINING); - return std::max(0, max_reservation); -} - -int64_t ReservationUtil::GetMinMemLimitFromReservation(int64_t buffer_reservation) { - buffer_reservation = std::max(0, buffer_reservation); - return std::max(buffer_reservation * (1.0 / ReservationUtil::RESERVATION_MEM_FRACTION), - buffer_reservation + ReservationUtil::RESERVATION_MEM_MIN_REMAINING); -} -} // namespace doris diff --git a/be/src/runtime/bufferpool/reservation_util.h b/be/src/runtime/bufferpool/reservation_util.h deleted file mode 100644 index 606e2f6475..0000000000 --- a/be/src/runtime/bufferpool/reservation_util.h +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -namespace doris { - -/// Utility code related to buffer reservations. -class ReservationUtil { -public: - /// There are currently two classes of memory: reserved memory (i.e. memory that is - /// reserved with reservation trackers/allocated by the buffer pool), and unreserved - /// memory (i.e. everything else; code that hasn't yet been updated to use reserved - /// memory). Eventually, all memory should be in the former category, but each operator - /// must be converted to use reserved memory and that work is ongoing. See IMPALA-4834. - /// In the meantime, the system memory must be shared between these two classes of - /// memory. RESERVATION_MEM_FRACTION and RESERVATION_MEM_MIN_REMAINING are used to - /// determine an upper bound on reserved memory for a query. Operators operate reliably - /// when they are using bounded reserved memory (e.g. staying under a limit by - /// spilling), but will generally fail if they hit a limit when trying to allocate - /// unreserved memory. Thus we need to ensure there is always space left in the query - /// memory limit for unreserved memory. - - /// The fraction of the query mem limit that is used as the maximum buffer reservation - /// limit, i.e. the bound on reserved memory. It is expected that unreserved memory - /// (i.e. not accounted by buffer reservation trackers) stays within - /// (1 - RESERVATION_MEM_FRACTION). - /// TODO: remove once all operators use buffer reservations. - static const double RESERVATION_MEM_FRACTION; - - /// The minimum amount of memory that should be left after buffer reservations, i.e. - /// this is the minimum amount of memory that should be left for unreserved memory. - /// TODO: remove once all operators use buffer reservations. - static const int64_t RESERVATION_MEM_MIN_REMAINING; - - /// Helper function to get the query buffer reservation limit (in bytes) given a query - /// mem_limit. In other words, this determines the maximum portion of the mem_limit - /// that should go to reserved memory. The limit on reservations is computed as: - /// min(query_limit * RESERVATION_MEM_FRACTION, - /// query_limit - RESERVATION_MEM_MIN_REMAINING) - /// TODO: remove once all operators use buffer reservations. - static int64_t GetReservationLimitFromMemLimit(int64_t mem_limit); - - /// Helper function to get the minimum query mem_limit (in bytes) that will be large - /// enough for a buffer reservation of size 'buffer_reservation' bytes. In other words, - /// this determines the minimum mem_limit that will be large enough to accomidate - /// 'buffer_reservation' reserved memory, as well as some amount of unreserved memory - /// (determined by a heuristic). - /// The returned mem_limit X satisfies: - /// buffer_reservation <= GetReservationLimitFromMemLimit(X) - /// TODO: remove once all operators use buffer reservations. - static int64_t GetMinMemLimitFromReservation(int64_t buffer_reservation); -}; - -} // namespace doris diff --git a/be/src/runtime/bufferpool/suballocator.cc b/be/src/runtime/bufferpool/suballocator.cc index 021a4922df..f26aee6205 100644 --- a/be/src/runtime/bufferpool/suballocator.cc +++ b/be/src/runtime/bufferpool/suballocator.cc @@ -20,7 +20,6 @@ #include #include "gutil/strings/substitute.h" -#include "runtime/bufferpool/reservation_tracker.h" #include "util/bit_util.h" namespace doris { @@ -97,10 +96,6 @@ uint64_t Suballocator::ComputeAllocateBufferSize(int64_t bytes) const { Status Suballocator::AllocateBuffer(int64_t bytes, std::unique_ptr* result) { DCHECK_LE(bytes, MAX_ALLOCATION_BYTES); const int64_t buffer_len = std::max(min_buffer_len_, BitUtil::RoundUpToPowerOfTwo(bytes)); - if (!client_->IncreaseReservationToFit(buffer_len)) { - *result = nullptr; - return Status::OK(); - } std::unique_ptr free_node; RETURN_IF_ERROR(Suballocation::Create(&free_node)); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index bb10b75aaa..0cd9878046 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -48,7 +48,6 @@ class StorageEngine; class MemTrackerTaskPool; class PriorityThreadPool; class PriorityWorkStealingThreadPool; -class ReservationTracker; class ResultBufferMgr; class ResultQueueMgr; class TMasterInfo; @@ -141,7 +140,6 @@ public: BrpcClientCache* brpc_function_client_cache() const { return _function_client_cache; } - ReservationTracker* buffer_reservation() { return _buffer_reservation; } BufferPool* buffer_pool() { return _buffer_pool; } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; } @@ -163,7 +161,7 @@ private: void _destroy(); Status _init_mem_tracker(); - /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. + /// Initialise 'buffer_pool_' with given capacity. void _init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); void _register_metrics(); @@ -224,7 +222,6 @@ private: BrpcClientCache* _internal_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; - ReservationTracker* _buffer_reservation = nullptr; BufferPool* _buffer_pool = nullptr; StorageEngine* _storage_engine = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 3057681596..d628ec7c84 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -27,7 +27,6 @@ #include "olap/storage_policy_mgr.h" #include "runtime/broker_mgr.h" #include "runtime/bufferpool/buffer_pool.h" -#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/cache/result_cache.h" #include "runtime/client_cache.h" #include "runtime/data_stream_mgr.h" @@ -311,8 +310,6 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity, int64_t clean_pages_limit) { DCHECK(_buffer_pool == nullptr); _buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit); - _buffer_reservation = new ReservationTracker(); - _buffer_reservation->InitRootTracker(nullptr, capacity); } void ExecEnv::_register_metrics() { @@ -364,7 +361,6 @@ void ExecEnv::_destroy() { SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_heartbeat_flags); SAFE_DELETE(_task_pool_mem_tracker_registry); - SAFE_DELETE(_buffer_reservation); SAFE_DELETE(_scanner_scheduler); DEREGISTER_HOOK_METRIC(query_mem_consumption); diff --git a/be/src/runtime/initial_reservations.cc b/be/src/runtime/initial_reservations.cc deleted file mode 100644 index 73757eaac4..0000000000 --- a/be/src/runtime/initial_reservations.cc +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.10.0/be/src/runtime/initial-reservations.cc -// and modified by Doris - -#include "runtime/initial_reservations.h" - -#include -#include - -#include "common/logging.h" -#include "common/object_pool.h" -#include "runtime/exec_env.h" -#include "util/pretty_printer.h" -#include "util/uid_util.h" - -using std::numeric_limits; - -namespace doris { - -InitialReservations::InitialReservations(ObjectPool* obj_pool, - ReservationTracker* query_reservation, - int64_t initial_reservation_total_claims) - : remaining_initial_reservation_claims_(initial_reservation_total_claims) { - initial_reservations_.InitChildTracker(nullptr, query_reservation, - numeric_limits::max()); -} - -Status InitialReservations::Init(const TUniqueId& query_id, int64_t query_min_reservation) { - DCHECK_EQ(0, initial_reservations_.GetReservation()) << "Already inited"; - if (!initial_reservations_.IncreaseReservation(query_min_reservation)) { - std::stringstream ss; - ss << "Minimum reservation unavailable: " << query_min_reservation - << " query id:" << query_id; - return Status::MinimumReservationUnavailable(ss.str()); - } - VLOG_QUERY << "Successfully claimed initial reservations (" - << PrettyPrinter::print(query_min_reservation, TUnit::BYTES) << ") for" - << " query " << print_id(query_id); - return Status::OK(); -} - -void InitialReservations::Claim(BufferPool::ClientHandle* dst, int64_t bytes) { - DCHECK_GE(bytes, 0); - std::lock_guard l(lock_); - DCHECK_LE(bytes, remaining_initial_reservation_claims_); - bool success = dst->TransferReservationFrom(&initial_reservations_, bytes); - DCHECK(success) << "Planner computation should ensure enough initial reservations"; - remaining_initial_reservation_claims_ -= bytes; -} - -void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes) { - std::lock_guard l(lock_); - bool success = src->TransferReservationTo(&initial_reservations_, bytes); - // No limits on our tracker - no way this should fail. - DCHECK(success); - // Check to see if we can release any reservation. - int64_t excess_reservation = - initial_reservations_.GetReservation() - remaining_initial_reservation_claims_; - if (excess_reservation > 0) { - initial_reservations_.DecreaseReservation(excess_reservation); - } -} - -void InitialReservations::ReleaseResources() { - initial_reservations_.Close(); -} -} // namespace doris diff --git a/be/src/runtime/initial_reservations.h b/be/src/runtime/initial_reservations.h deleted file mode 100644 index 9ffb3ab367..0000000000 --- a/be/src/runtime/initial_reservations.h +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.10.0/be/src/runtime/initial-reservations.h -// and modified by Doris - -#pragma once - -#include "common/status.h" -#include "gen_cpp/Types_types.h" // for TUniqueId -#include "runtime/bufferpool/buffer_pool.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "util/spinlock.h" - -namespace doris { - -class ObjectPool; - -/** - * Manages the pool of initial reservations for different nodes in the plan tree. - * Each plan node and sink claims its initial reservation from here, then returns it when - * it is done executing. The frontend is responsible for making sure that enough initial - * reservation is in this pool for all of the concurrent claims. - */ -class InitialReservations { -public: - /// 'query_reservation' and 'query_mem_tracker' are the top-level trackers for the - /// query. This creates trackers for initial reservations under those. - /// 'initial_reservation_total_claims' is the total of initial reservations that will be - /// claimed over the lifetime of the query. The total bytes claimed via Claim() - /// cannot exceed this. Allocated objects are stored in 'obj_pool'. - InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation, - int64_t initial_reservation_total_claims); - - /// Initialize the query's pool of initial reservations by acquiring the minimum - /// reservation required for the query on this host. Fails if the reservation could - /// not be acquired, e.g. because it would exceed a pool or process limit. - Status Init(const TUniqueId& query_id, int64_t query_min_reservation) WARN_UNUSED_RESULT; - - /// Claim the initial reservation of 'bytes' for 'dst'. Assumes that the transfer will - /// not violate any reservation limits on 'dst'. - void Claim(BufferPool::ClientHandle* dst, int64_t bytes); - - /// Return the initial reservation of 'bytes' from 'src'. The reservation is returned - /// to the pool of reservations if it may be needed to satisfy a subsequent claim or - /// otherwise is released. - void Return(BufferPool::ClientHandle* src, int64_t bytes); - - /// Release any reservations held onto by this object. - void ReleaseResources(); - -private: - // Protects all below members to ensure that the internal state is consistent. - SpinLock lock_; - - // The pool of initial reservations that Claim() returns reservations from and - // Return() returns reservations to. - ReservationTracker initial_reservations_; - - /// The total bytes of additional reservations that we expect to be claimed. - /// initial_reservations_->GetReservation() <= remaining_initial_reservation_claims_. - int64_t remaining_initial_reservation_claims_; -}; -} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 84209888ef..df7becf04d 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -252,29 +252,30 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); DCHECK(!_limited_ancestors.empty()); + std::string detail = fmt::format("memory limit exceeded:has_limit() && - tracker->limit() < tracker->peak_consumption() + failed_consume_size) { - std::string detail; - if (failed_consume_size != 0) { - detail = fmt::format( - "memory limit exceeded:, " - "executing:<{}>", - _label, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), - tracker->label(), tracker->limit(), tracker->peak_consumption(), - tracker->consumption(), msg); - } else { - detail = fmt::format( - "memory limit exceeded:, executing:<{}>", - tracker->label(), tracker->limit(), tracker->peak_consumption(), - tracker->consumption(), msg); - } - return tracker->mem_limit_exceeded_log(detail); + int64_t max_consumption = tracker->peak_consumption() > tracker->consumption() + ? tracker->peak_consumption() + : tracker->consumption(); + if (tracker->has_limit() && tracker->limit() < max_consumption + failed_consume_size) { + exceeded_tracker = tracker; + break; + } + if (tracker->has_limit() && tracker->limit() - max_consumption < free_size) { + free_size = tracker->limit() - max_consumption; + exceeded_tracker = tracker; } } - return Status::MemoryLimitExceeded("no mem tracker exceed limit"); + detail += fmt::format( + "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>, executing_msg:<{}>", + exceeded_tracker->label(), exceeded_tracker->limit(), + exceeded_tracker->peak_consumption(), exceeded_tracker->consumption(), msg); + return exceeded_tracker->mem_limit_exceeded_log(detail); } Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, @@ -282,8 +283,8 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, Status failed_try_consume_st) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); std::string detail = - fmt::format("memory limit exceeded:, executing:<{}>", _label, - failed_try_consume_st.get_error_msg(), msg); + fmt::format("memory limit exceeded:, executing_msg:<{}>", + _label, failed_try_consume_st.get_error_msg(), msg); return failed_tracker->mem_limit_exceeded_log(detail); } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 208130f928..e8a5e2846c 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -31,10 +31,7 @@ #include "common/status.h" #include "exec/exec_node.h" #include "runtime/buffered_block_mgr2.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "runtime/bufferpool/reservation_util.h" #include "runtime/exec_env.h" -#include "runtime/initial_reservations.h" #include "runtime/load_path_mgr.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_task_pool.h" @@ -68,8 +65,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, _normal_row_number(0), _error_row_number(0), _error_log_file_path(""), - _error_log_file(nullptr), - _instance_buffer_reservation(new ReservationTracker) { + _error_log_file(nullptr) { Status status = init(fragment_instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); } @@ -94,8 +90,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _normal_row_number(0), _error_row_number(0), _error_log_file_path(""), - _error_log_file(nullptr), - _instance_buffer_reservation(new ReservationTracker) { + _error_log_file(nullptr) { if (fragment_exec_params.__isset.runtime_filter_params) { _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params); } @@ -157,19 +152,6 @@ RuntimeState::~RuntimeState() { _error_hub->close(); } - // Release the reservation, which should be unused at the point. - if (_instance_buffer_reservation != nullptr) { - _instance_buffer_reservation->Close(); - } - - if (_initial_reservations != nullptr) { - _initial_reservations->ReleaseResources(); - } - - if (_buffer_reservation != nullptr) { - _buffer_reservation->Close(); - } - // Manually release the child mem tracker before _instance_mem_tracker is destructed. _obj_pool->clear(); _runtime_filter_mgr.reset(); @@ -248,17 +230,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { -1, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker, &_profile); - RETURN_IF_ERROR(init_buffer_poolstate()); - - _initial_reservations = _obj_pool->add(new InitialReservations( - _obj_pool.get(), _buffer_reservation, _query_options.initial_reservation_total_claims)); - RETURN_IF_ERROR(_initial_reservations->Init(_query_id, min_reservation())); - DCHECK_EQ(0, _initial_reservation_refcnt.load()); - - if (_instance_buffer_reservation != nullptr) { - _instance_buffer_reservation->InitChildTracker(&_profile, _buffer_reservation, - std::numeric_limits::max()); - } return Status::OK(); } @@ -268,29 +239,6 @@ Status RuntimeState::init_instance_mem_tracker() { return Status::OK(); } -Status RuntimeState::init_buffer_poolstate() { - ExecEnv* exec_env = ExecEnv::GetInstance(); - int64_t mem_limit = _query_mem_tracker->get_lowest_limit(); - int64_t max_reservation; - if (query_options().__isset.buffer_pool_limit && query_options().buffer_pool_limit > 0) { - max_reservation = query_options().buffer_pool_limit; - } else if (mem_limit == -1) { - // No query mem limit. The process-wide reservation limit is the only limit on - // reservations. - max_reservation = std::numeric_limits::max(); - } else { - DCHECK_GE(mem_limit, 0); - max_reservation = ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); - } - - VLOG_QUERY << "Buffer pool limit for " << print_id(_query_id) << ": " << max_reservation; - - _buffer_reservation = _obj_pool->add(new ReservationTracker); - _buffer_reservation->InitChildTracker(nullptr, exec_env->buffer_reservation(), max_reservation); - - return Status::OK(); -} - Status RuntimeState::create_block_mgr() { DCHECK(_block_mgr2.get() == nullptr); RETURN_IF_ERROR(BufferedBlockMgr2::create(this, runtime_profile(), _exec_env->tmp_file_mgr(), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 033f19ae3f..46838af5cb 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -56,8 +56,6 @@ class TmpFileMgr; class BufferedBlockMgr; class BufferedBlockMgr2; class LoadErrorHub; -class ReservationTracker; -class InitialReservations; class RowDescriptor; class RuntimeFilterMgr; @@ -95,9 +93,6 @@ public: // for ut only Status init_instance_mem_tracker(); - /// Called from Init() to set up buffer reservations and the file group. - Status init_buffer_poolstate(); - // Gets/Creates the query wide block mgr. Status create_block_mgr(); @@ -319,12 +314,6 @@ public: int num_per_fragment_instances() const { return _num_per_fragment_instances; } - ReservationTracker* instance_buffer_reservation() { return _instance_buffer_reservation.get(); } - - int64_t min_reservation() const { return _query_options.min_reservation; } - - int64_t max_reservation() const { return _query_options.max_reservation; } - bool disable_stream_preaggregations() const { return _query_options.disable_stream_preaggregations; } @@ -360,11 +349,6 @@ public: return segment_v2::CompressionTypePB::SNAPPY; } - // the following getters are only valid after Prepare() - InitialReservations* initial_reservations() const { return _initial_reservations; } - - ReservationTracker* buffer_reservation() const { return _buffer_reservation; } - const std::vector& tablet_commit_infos() const { return _tablet_commit_infos; } @@ -515,26 +499,6 @@ private: std::vector _tablet_commit_infos; std::vector _error_tablet_infos; - //TODO chenhao , remove this to QueryState - /// Pool of buffer reservations used to distribute initial reservations to operators - /// in the query. Contains a ReservationTracker that is a child of - /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare(). - ReservationTracker* _buffer_reservation = nullptr; - - /// Buffer reservation for this fragment instance - a child of the query buffer - /// reservation. Non-nullptr if 'query_state_' is not nullptr. - std::unique_ptr _instance_buffer_reservation; - - /// Pool of buffer reservations used to distribute initial reservations to operators - /// in the query. Contains a ReservationTracker that is a child of - /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare(). - InitialReservations* _initial_reservations = nullptr; - - /// Number of fragment instances executing, which may need to claim - /// from 'initial_reservations_'. - /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575). - std::atomic _initial_reservation_refcnt {0}; - QueryFragmentsCtx* _query_ctx; // true if max_filter_ratio is 0 diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index 812313750f..4e2d36bc88 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -22,7 +22,6 @@ #include "common/config.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/decimalv2_value.h" #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" @@ -57,7 +56,6 @@ public: _env->_load_stream_mgr = new LoadStreamMgr(); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); - _env->_buffer_reservation = new ReservationTracker(); _env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(1) @@ -74,7 +72,6 @@ public: SAFE_DELETE(_env->_load_stream_mgr); SAFE_DELETE(_env->_master_info); SAFE_DELETE(_env->_thread_mgr); - SAFE_DELETE(_env->_buffer_reservation); SAFE_DELETE(_env->_task_pool_mem_tracker_registry); if (_server) { _server->Stop(100); diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc index 6892cafd31..e48dba9f42 100644 --- a/be/test/runtime/test_env.cc +++ b/be/test/runtime/test_env.cc @@ -22,8 +22,8 @@ #include #include "olap/storage_engine.h" +#include "runtime/bufferpool/buffer_pool.h" #include "runtime/fragment_mgr.h" -#include "runtime/initial_reservations.h" #include "runtime/memory/mem_tracker_task_pool.h" #include "runtime/result_queue_mgr.h" #include "util/disk_info.h" @@ -35,7 +35,6 @@ TestEnv::TestEnv() { // Some code will use ExecEnv::GetInstance(), so init the global ExecEnv singleton _exec_env = ExecEnv::GetInstance(); _exec_env->_thread_mgr = new ThreadResourceMgr(2); - _exec_env->_buffer_reservation = new ReservationTracker(); _exec_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10); _exec_env->disk_io_mgr()->init(-1); @@ -64,7 +63,6 @@ TestEnv::~TestEnv() { SAFE_DELETE(_exec_env->_scan_thread_pool); SAFE_DELETE(_exec_env->_disk_io_mgr); SAFE_DELETE(_exec_env->_task_pool_mem_tracker_registry); - SAFE_DELETE(_exec_env->_buffer_reservation); SAFE_DELETE(_exec_env->_thread_mgr); if (_engine == StorageEngine::_s_instance) { diff --git a/be/test/util/arrow/arrow_work_flow_test.cpp b/be/test/util/arrow/arrow_work_flow_test.cpp index c160047de4..240a9e7ea2 100644 --- a/be/test/util/arrow/arrow_work_flow_test.cpp +++ b/be/test/util/arrow/arrow_work_flow_test.cpp @@ -28,7 +28,6 @@ #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" #include "olap/row.h" -#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_task_pool.h" #include "runtime/result_queue_mgr.h" @@ -68,7 +67,6 @@ protected: if (_exec_env) { delete _exec_env->_result_queue_mgr; delete _exec_env->_thread_mgr; - delete _exec_env->_buffer_reservation; delete _exec_env->_task_pool_mem_tracker_registry; } } @@ -95,7 +93,6 @@ void ArrowWorkFlowTest::init() { void ArrowWorkFlowTest::init_runtime_state() { _exec_env->_result_queue_mgr = new ResultQueueMgr(); _exec_env->_thread_mgr = new ThreadResourceMgr(); - _exec_env->_buffer_reservation = new ReservationTracker(); _exec_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); _exec_env->_is_init = true; TQueryOptions query_options; diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 39d98e38ad..f2b7248fce 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -25,7 +25,6 @@ #include "common/config.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/decimalv2_value.h" #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" @@ -59,7 +58,6 @@ public: _env->_load_stream_mgr = new LoadStreamMgr(); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); - _env->_buffer_reservation = new ReservationTracker(); _env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool(); ThreadPoolBuilder("SendBatchThreadPool") .set_min_threads(1) @@ -76,7 +74,6 @@ public: SAFE_DELETE(_env->_load_stream_mgr); SAFE_DELETE(_env->_master_info); SAFE_DELETE(_env->_thread_mgr); - SAFE_DELETE(_env->_buffer_reservation); SAFE_DELETE(_env->_task_pool_mem_tracker_registry); if (_server) { _server->Stop(100);