mem_tracker_factor_v2 (#10743)
This commit is contained in:
53
be/src/runtime/memory/mem_tracker_base.cpp
Normal file
53
be/src/runtime/memory/mem_tracker_base.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp
|
||||
// and modified by Doris
|
||||
|
||||
#include "runtime/memory/mem_tracker_base.h"
|
||||
|
||||
#include "util/time.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
const std::string MemTrackerBase::COUNTER_NAME = "PeakMemoryUsage";
|
||||
|
||||
MemTrackerBase::MemTrackerBase(const std::string& label, MemTrackerLimiter* parent,
|
||||
RuntimeProfile* profile)
|
||||
: _label(label),
|
||||
// Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash.
|
||||
_id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
|
||||
_parent(parent) {
|
||||
if (profile == nullptr) {
|
||||
_consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
|
||||
} else {
|
||||
// By default, memory consumption is tracked via calls to consume()/release(), either to
|
||||
// the tracker itself or to one of its descendents. Alternatively, a consumption metric
|
||||
// can be specified, and then the metric's value is used as the consumption rather than
|
||||
// the tally maintained by consume() and release(). A tcmalloc metric is used to track
|
||||
// process memory consumption, since the process memory usage may be higher than the
|
||||
// computed total memory (tcmalloc does not release deallocated memory immediately).
|
||||
// Other consumption metrics are used in trackers below the process level to account
|
||||
// for memory (such as free buffer pool buffers) that is not tracked by consume() and
|
||||
// release().
|
||||
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
MemTrackerBase::MemTrackerBase(const std::string& label)
|
||||
: MemTrackerBase(label, nullptr, nullptr) {}
|
||||
} // namespace doris
|
||||
78
be/src/runtime/memory/mem_tracker_base.h
Normal file
78
be/src/runtime/memory/mem_tracker_base.h
Normal file
@ -0,0 +1,78 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
// This file is copied from
|
||||
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h
|
||||
// and modified by Doris
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class MemTrackerLimiter;
|
||||
|
||||
// A MemTracker tracks memory consumption.
|
||||
// This class is thread-safe.
|
||||
class MemTrackerBase {
|
||||
public:
|
||||
const std::string& label() const { return _label; }
|
||||
|
||||
// Returns the memory consumed in bytes.
|
||||
int64_t consumption() const { return _consumption->current_value(); }
|
||||
int64_t peak_consumption() const { return _consumption->value(); }
|
||||
|
||||
MemTrackerBase(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile);
|
||||
|
||||
// this is used for creating an orphan mem tracker, or for unit test.
|
||||
// If a mem tracker has parent, it should be created by `create_tracker()`
|
||||
MemTrackerBase(const std::string& label = std::string());
|
||||
|
||||
MemTrackerLimiter* parent() const { return _parent; }
|
||||
int64_t id() { return _id; }
|
||||
bool is_limited() { return _is_limited; } // MemTrackerLimiter
|
||||
bool is_observed() { return _is_observed; }
|
||||
void set_is_limited() { _is_limited = true; } // MemTrackerObserve
|
||||
void set_is_observed() { _is_observed = true; }
|
||||
|
||||
// Usually, a negative values means that the statistics are not accurate,
|
||||
// 1. The released memory is not consumed.
|
||||
// 2. The same block of memory, tracker A calls consume, and tracker B calls release.
|
||||
// 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
|
||||
// after the release is called on the parent MemTracker,
|
||||
// the child ~MemTracker will cause repeated releases.
|
||||
void memory_leak_check() { DCHECK_EQ(_consumption->current_value(), 0); }
|
||||
|
||||
static const std::string COUNTER_NAME;
|
||||
|
||||
protected:
|
||||
// label used in the usage string (log_usage())
|
||||
std::string _label;
|
||||
|
||||
// Automatically generated, unique for each mem tracker.
|
||||
int64_t _id;
|
||||
|
||||
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes
|
||||
|
||||
bool _is_limited = false; // is MemTrackerLimiter
|
||||
|
||||
bool _is_observed = false; // is MemTrackerObserve
|
||||
|
||||
MemTrackerLimiter* _parent; // The parent of this tracker.
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
333
be/src/runtime/memory/mem_tracker_limiter.cpp
Normal file
333
be/src/runtime/memory/mem_tracker_limiter.cpp
Normal file
@ -0,0 +1,333 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include "gutil/once.h"
|
||||
#include "runtime/memory/mem_tracker_observe.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "util/pretty_printer.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
// The ancestor for all trackers. Every tracker is visible from the process down.
|
||||
// All manually created trackers should specify the process tracker as the parent.
|
||||
static MemTrackerLimiter* process_tracker;
|
||||
static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT;
|
||||
|
||||
MemTrackerLimiter* MemTrackerLimiter::create_tracker(int64_t byte_limit, const std::string& label,
|
||||
MemTrackerLimiter* parent,
|
||||
RuntimeProfile* profile) {
|
||||
// Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called.
|
||||
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
|
||||
if (!parent) {
|
||||
parent = MemTrackerLimiter::get_process_tracker();
|
||||
}
|
||||
MemTrackerLimiter* tracker(new MemTrackerLimiter("[Limit]-" + label, parent, profile));
|
||||
parent->add_child_tracker(tracker);
|
||||
tracker->set_is_limited();
|
||||
tracker->init(byte_limit);
|
||||
return tracker;
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::init(int64_t limit) {
|
||||
DCHECK_GE(limit, -1);
|
||||
_limit = limit;
|
||||
MemTrackerLimiter* tracker = this;
|
||||
while (tracker != nullptr) {
|
||||
_ancestor_all_trackers.push_back(tracker);
|
||||
if (tracker->has_limit()) _ancestor_limiter_trackers.push_back(tracker);
|
||||
tracker = tracker->_parent;
|
||||
}
|
||||
DCHECK_GT(_ancestor_all_trackers.size(), 0);
|
||||
DCHECK_EQ(_ancestor_all_trackers[0], this);
|
||||
}
|
||||
|
||||
MemTrackerLimiter::~MemTrackerLimiter() {
|
||||
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
|
||||
if (_label == "Process") doris::thread_local_ctx._init = false;
|
||||
flush_untracked_mem();
|
||||
if (parent()) {
|
||||
// Do not call release on the parent tracker to avoid repeated releases.
|
||||
// Ensure that all consume/release are triggered by TCMalloc new/delete hook.
|
||||
std::lock_guard<SpinLock> l(_parent->_child_trackers_lock);
|
||||
if (_child_tracker_it != _parent->_child_limiter_trackers.end()) {
|
||||
_parent->_child_limiter_trackers.erase(_child_tracker_it);
|
||||
_child_tracker_it = _parent->_child_limiter_trackers.end();
|
||||
}
|
||||
}
|
||||
// The child observe tracker life cycle is controlled by its parent limiter tarcker.
|
||||
for (audo tracker : _child_observe_trackers) {
|
||||
delete tracker;
|
||||
}
|
||||
DCHECK_EQ(_untracked_mem, 0);
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::add_child_tracker(MemTrackerLimiter* tracker) {
|
||||
std::lock_guard<SpinLock> l(_child_trackers_lock);
|
||||
tracker->_child_tracker_it =
|
||||
_child_limiter_trackers.insert(_child_limiter_trackers.end(), tracker);
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::add_child_tracker(MemTrackerObserve* tracker) {
|
||||
std::lock_guard<SpinLock> l(_child_trackers_lock);
|
||||
tracker->_child_tracker_it =
|
||||
_child_observe_trackers.insert(_child_observe_trackers.end(), tracker);
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::remove_child_tracker(MemTrackerLimiter* tracker) {
|
||||
std::lock_guard<SpinLock> l(_child_trackers_lock);
|
||||
if (tracker->_child_tracker_it != _child_limiter_trackers.end()) {
|
||||
_child_limiter_trackers.erase(tracker->_child_tracker_it);
|
||||
tracker->_child_tracker_it = _child_limiter_trackers.end();
|
||||
}
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::remove_child_tracker(MemTrackerObserve* tracker) {
|
||||
std::lock_guard<SpinLock> l(_child_trackers_lock);
|
||||
if (tracker->_child_tracker_it != _child_observe_trackers.end()) {
|
||||
_child_observe_trackers.erase(tracker->_child_tracker_it);
|
||||
tracker->_child_tracker_it = _child_observe_trackers.end();
|
||||
}
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::create_process_tracker() {
|
||||
process_tracker = new MemTrackerLimiter("Process", nullptr, nullptr);
|
||||
process_tracker->init(-1);
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerLimiter::get_process_tracker() {
|
||||
GoogleOnceInit(&process_tracker_once, &MemTrackerLimiter::create_process_tracker);
|
||||
return process_tracker;
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::list_process_trackers(std::vector<MemTrackerBase*>* trackers) {
|
||||
trackers->clear();
|
||||
std::deque<MemTrackerLimiter*> to_process;
|
||||
to_process.push_front(get_process_tracker());
|
||||
while (!to_process.empty()) {
|
||||
MemTrackerLimiter* t = to_process.back();
|
||||
to_process.pop_back();
|
||||
|
||||
trackers->push_back(t);
|
||||
std::list<MemTrackerLimiter*> limiter_children;
|
||||
std::list<MemTrackerObserve*> observe_children;
|
||||
{
|
||||
std::lock_guard<SpinLock> l(t->_child_trackers_lock);
|
||||
limiter_children = t->_child_limiter_trackers;
|
||||
observe_children = t->_child_observe_trackers;
|
||||
}
|
||||
for (const auto& child : limiter_children) {
|
||||
to_process.emplace_back(std::move(child));
|
||||
}
|
||||
if (config::show_observe_tracker) {
|
||||
for (const auto& child : observe_children) {
|
||||
trackers->push_back(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerLimiter::common_ancestor(MemTrackerLimiter* dst) {
|
||||
if (id() == dst->id()) return dst;
|
||||
DCHECK_EQ(_ancestor_all_trackers.back(), dst->_ancestor_all_trackers.back())
|
||||
<< "Must have same ancestor";
|
||||
int ancestor_idx = _ancestor_all_trackers.size() - 1;
|
||||
int dst_ancestor_idx = dst->_ancestor_all_trackers.size() - 1;
|
||||
while (ancestor_idx > 0 && dst_ancestor_idx > 0 &&
|
||||
_ancestor_all_trackers[ancestor_idx - 1] ==
|
||||
dst->_ancestor_all_trackers[dst_ancestor_idx - 1]) {
|
||||
--ancestor_idx;
|
||||
--dst_ancestor_idx;
|
||||
}
|
||||
return _ancestor_all_trackers[ancestor_idx];
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerLimiter::limit_exceeded_tracker() const {
|
||||
for (const auto& tracker : _ancestor_limiter_trackers) {
|
||||
if (tracker->limit_exceeded()) {
|
||||
return tracker;
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
int64_t MemTrackerLimiter::spare_capacity() const {
|
||||
int64_t result = std::numeric_limits<int64_t>::max();
|
||||
for (const auto& tracker : _ancestor_limiter_trackers) {
|
||||
int64_t mem_left = tracker->limit() - tracker->consumption();
|
||||
result = std::min(result, mem_left);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int64_t MemTrackerLimiter::get_lowest_limit() const {
|
||||
if (_ancestor_limiter_trackers.empty()) return -1;
|
||||
int64_t min_limit = std::numeric_limits<int64_t>::max();
|
||||
for (const auto& tracker : _ancestor_limiter_trackers) {
|
||||
DCHECK(tracker->has_limit());
|
||||
min_limit = std::min(min_limit, tracker->limit());
|
||||
}
|
||||
return min_limit;
|
||||
}
|
||||
|
||||
bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
|
||||
if (max_consumption < 0) return true;
|
||||
std::lock_guard<std::mutex> l(_gc_lock);
|
||||
int64_t pre_gc_consumption = consumption();
|
||||
// Check if someone gc'd before us
|
||||
if (pre_gc_consumption < max_consumption) return false;
|
||||
|
||||
int64_t curr_consumption = pre_gc_consumption;
|
||||
// Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later.
|
||||
const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L;
|
||||
// Try to free up some memory
|
||||
for (int i = 0; i < _gc_functions.size(); ++i) {
|
||||
// Try to free up the amount we are over plus some extra so that we don't have to
|
||||
// immediately GC again. Don't free all the memory since that can be unnecessarily
|
||||
// expensive.
|
||||
int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE;
|
||||
_gc_functions[i](bytes_to_free);
|
||||
curr_consumption = consumption();
|
||||
if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
|
||||
}
|
||||
|
||||
return curr_consumption > max_consumption;
|
||||
}
|
||||
|
||||
Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
|
||||
if (UNLIKELY(gc_memory(_limit - bytes))) {
|
||||
return Status::MemoryLimitExceeded(
|
||||
fmt::format("label={} TryConsume failed size={}, used={}, limit={}", label(), bytes,
|
||||
_consumption->current_value(), _limit));
|
||||
}
|
||||
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
|
||||
<< " consumption=" << _consumption->current_value() << " limit=" << _limit;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Calling this on the query tracker results in output like:
|
||||
//
|
||||
// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
|
||||
// Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB
|
||||
// EXCHANGE_NODE (id=4): Total=0 Peak=0
|
||||
// DataStreamRecvr: Total=0 Peak=0
|
||||
// Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB
|
||||
// Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB
|
||||
// AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB
|
||||
// HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB
|
||||
// DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB
|
||||
// Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB
|
||||
// AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB
|
||||
// EXCHANGE_NODE (id=2): Total=0 Peak=0
|
||||
// DataStreamRecvr: Total=45.91 KB Peak=684.07 KB
|
||||
// DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B
|
||||
//
|
||||
// If 'reservation_metrics_' are set, we ge a more granular breakdown:
|
||||
// TrackerName: Limit=5.00 MB Reservation=5.00 MB OtherMemory=1.04 MB
|
||||
// Total=6.04 MB Peak=6.45 MB
|
||||
//
|
||||
std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logged_consumption) {
|
||||
// Make sure the consumption is up to date.
|
||||
int64_t curr_consumption = consumption();
|
||||
int64_t peak_consumption = _consumption->value();
|
||||
if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
|
||||
|
||||
std::string detail =
|
||||
"MemTracker log_usage Label: {}, Limit: {}, Total: {}, Peak: {}, Exceeded: {}";
|
||||
detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
|
||||
PrettyPrinter::print(curr_consumption, TUnit::BYTES),
|
||||
PrettyPrinter::print(peak_consumption, TUnit::BYTES),
|
||||
limit_exceeded() ? "true" : "false");
|
||||
|
||||
// This call does not need the children, so return early.
|
||||
if (max_recursive_depth == 0) return detail;
|
||||
|
||||
// Recurse and get information about the children
|
||||
int64_t child_consumption;
|
||||
std::string child_trackers_usage;
|
||||
std::list<MemTrackerLimiter*> limiter_children;
|
||||
std::list<MemTrackerObserve*> observe_children;
|
||||
{
|
||||
std::lock_guard<SpinLock> l(_child_trackers_lock);
|
||||
limiter_children = _child_limiter_trackers;
|
||||
observe_children = _child_observe_trackers;
|
||||
}
|
||||
child_trackers_usage = log_usage(max_recursive_depth - 1, limiter_children, &child_consumption);
|
||||
for (const auto& child : observe_children) {
|
||||
child_trackers_usage += "\n" + child->log_usage(&child_consumption);
|
||||
}
|
||||
if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
|
||||
return detail;
|
||||
}
|
||||
|
||||
std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
|
||||
const std::list<MemTrackerLimiter*>& trackers,
|
||||
int64_t* logged_consumption) {
|
||||
*logged_consumption = 0;
|
||||
std::vector<std::string> usage_strings;
|
||||
for (const auto& tracker : trackers) {
|
||||
if (tracker) {
|
||||
int64_t tracker_consumption;
|
||||
std::string usage_string =
|
||||
tracker->log_usage(max_recursive_depth, &tracker_consumption);
|
||||
if (!usage_string.empty()) usage_strings.push_back(usage_string);
|
||||
*logged_consumption += tracker_consumption;
|
||||
}
|
||||
}
|
||||
return join(usage_strings, "\n");
|
||||
}
|
||||
|
||||
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details,
|
||||
int64_t failed_allocation_size, Status failed_alloc) {
|
||||
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
|
||||
MemTrackerLimiter* process_tracker = MemTrackerLimiter::get_process_tracker();
|
||||
std::string detail =
|
||||
"Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process "
|
||||
"limit={}.";
|
||||
detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "",
|
||||
details, BackendOptions::get_localhost(),
|
||||
PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
|
||||
if (!failed_alloc) {
|
||||
detail += " failed alloc=<{}>. current tracker={}.";
|
||||
detail = fmt::format(detail, failed_alloc.to_string(), _label);
|
||||
} else {
|
||||
detail += " current tracker <label={}, used={}, limit={}, failed alloc size={}>.";
|
||||
detail = fmt::format(detail, _label, _consumption->current_value(), _limit,
|
||||
PrettyPrinter::print(failed_allocation_size, TUnit::BYTES));
|
||||
}
|
||||
detail += " If this is a query, can change the limit by session variable exec_mem_limit.";
|
||||
Status status = Status::MemoryLimitExceeded(detail);
|
||||
if (state != nullptr) state->log_error(detail);
|
||||
|
||||
// only print the tracker log_usage in be log.
|
||||
if (process_tracker->spare_capacity() < failed_allocation_size) {
|
||||
// Dumping the process MemTracker is expensive. Limiting the recursive depth to two
|
||||
// levels limits the level of detail to a one-line summary for each query MemTracker.
|
||||
detail += "\n" + process_tracker->log_usage(2);
|
||||
}
|
||||
detail += "\n" + log_usage();
|
||||
|
||||
LOG(WARNING) << detail;
|
||||
return status;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
348
be/src/runtime/memory/mem_tracker_limiter.h
Normal file
348
be/src/runtime/memory/mem_tracker_limiter.h
Normal file
@ -0,0 +1,348 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "common/config.h"
|
||||
#include "runtime/memory/mem_tracker_base.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/mem_info.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class MemTrackerObserve;
|
||||
|
||||
// Tracker contains an limit, and can be arranged into a tree structure such that the consumption
|
||||
// tracked by a MemTracker is also tracked by its ancestors.
|
||||
// Used for:
|
||||
// 1. Track and limit the memory usage of process and query.
|
||||
// Automatic memory consume based on system memory allocation (Currently, based on TCMlloc hook).
|
||||
// 2. Execution logic that requires memory size to participate in control.
|
||||
// Manual consumption, but will not affect the overall statistics of the process.
|
||||
//
|
||||
// We use a five-level hierarchy of mem trackers: process, query pool, query, instance,
|
||||
// node. Specific parts of the fragment (exec nodes, sinks, etc) will add a
|
||||
// fifth level when they are initialized.
|
||||
//
|
||||
// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
|
||||
// reached. If limit_exceeded() is called and the limit is exceeded, it will first call
|
||||
// the GcFunctions to try to free memory and recheck the limit. For example, the process
|
||||
// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
|
||||
// this will be called before the process limit is reported as exceeded. GcFunctions are
|
||||
// called in the order they are added, so expensive functions should be added last.
|
||||
// GcFunctions are called with a global lock held, so should be non-blocking and not
|
||||
// call back into MemTrackers, except to release memory.
|
||||
class MemTrackerLimiter final : public MemTrackerBase {
|
||||
public:
|
||||
// Creates and adds the tracker to the tree
|
||||
static MemTrackerLimiter* create_tracker(int64_t byte_limit, const std::string& label,
|
||||
MemTrackerLimiter* parent = nullptr,
|
||||
RuntimeProfile* profile = nullptr);
|
||||
|
||||
// Walks the MemTrackerLimiter hierarchy and populates _ancestor_all_trackers and limit_trackers_
|
||||
void init(int64_t limit);
|
||||
|
||||
~MemTrackerLimiter();
|
||||
|
||||
// Adds tracker to _child_trackers
|
||||
void add_child_tracker(MemTrackerLimiter* tracker);
|
||||
void add_child_tracker(MemTrackerObserve* tracker);
|
||||
// Remove tracker from _child_trackers
|
||||
void remove_child_tracker(MemTrackerLimiter* tracker);
|
||||
void remove_child_tracker(MemTrackerObserve* tracker);
|
||||
|
||||
// Leaf tracker, without any child
|
||||
bool is_leaf() { _child_limiter_trackers.size() + _child_observe_trackers.size() == 0; }
|
||||
|
||||
// Gets a "process" tracker, creating it if necessary.
|
||||
static MemTrackerLimiter* get_process_tracker();
|
||||
|
||||
// Returns a list of all the valid trackers.
|
||||
static void list_process_trackers(std::vector<MemTrackerBase*>* trackers);
|
||||
|
||||
public:
|
||||
// The following func, for execution logic that requires memory size to participate in control.
|
||||
// this does not change the value of process tracker.
|
||||
|
||||
// only consume self, will not sync to parent. Usually used to manually record the specified memory,
|
||||
// It is independent of the automatically recording of thread local tracker, so the same block of memory
|
||||
// will be recorded in the thread local tracker and the current tracker at the same time.
|
||||
void consume_self(int64_t bytes);
|
||||
void release_self(int64_t bytes) { consume_self(-bytes); }
|
||||
|
||||
// up to (but not including) end_tracker.
|
||||
// This is useful if we want to move tracking between trackers that share a common (i.e. end_tracker)
|
||||
// ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption
|
||||
// against the limit recorded in one of its ancestors already happened.
|
||||
void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);
|
||||
void release_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
|
||||
consume_local(-bytes, end_tracker);
|
||||
}
|
||||
|
||||
// Transfer 'bytes' of consumption from this tracker to 'dst'.
|
||||
// Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated.
|
||||
void transfer_to(MemTrackerLimiter* dst, int64_t bytes);
|
||||
|
||||
// When the accumulated untracked memory value exceeds the upper limit,
|
||||
// the current value is returned and set to 0.
|
||||
// Thread safety.
|
||||
int64_t add_untracked_mem(int64_t bytes);
|
||||
|
||||
// In most cases, no need to call flush_untracked_mem on the child tracker,
|
||||
// because when it is destructed, theoretically all its children have been destructed.
|
||||
void flush_untracked_mem() { consume(_untracked_mem.exchange(0)); }
|
||||
|
||||
// Find the common ancestor and update trackers between 'this'/'dst' and
|
||||
// the common ancestor. This logic handles all cases, including the
|
||||
// two trackers being the same or being ancestors of each other because
|
||||
// 'all_trackers_' includes the current tracker.
|
||||
MemTrackerLimiter* common_ancestor(MemTrackerLimiter* dst);
|
||||
|
||||
public:
|
||||
// The following func, for mem limit.
|
||||
|
||||
Status check_sys_mem_info(int64_t bytes) {
|
||||
// TODO add mmap
|
||||
if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
|
||||
return Status::MemoryLimitExceeded(fmt::format(
|
||||
"{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}",
|
||||
_label, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool has_limit() const { return _limit >= 0; }
|
||||
int64_t limit() const { return _limit; }
|
||||
void update_limit(int64_t limit) {
|
||||
DCHECK(has_limit());
|
||||
_limit = limit;
|
||||
}
|
||||
bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }
|
||||
bool any_limit_exceeded() const { return limit_exceeded_tracker() != nullptr; }
|
||||
|
||||
// Returns true if a valid limit of this tracker or one of its ancestors is exceeded.
|
||||
MemTrackerLimiter* limit_exceeded_tracker() const;
|
||||
|
||||
Status check_limit(int64_t bytes);
|
||||
|
||||
// Returns the maximum consumption that can be made without exceeding the limit on
|
||||
// this tracker or any of its parents. Returns int64_t::max() if there are no
|
||||
// limits and a negative value if any limit is already exceeded.
|
||||
int64_t spare_capacity() const;
|
||||
|
||||
// Returns the lowest limit for this tracker and its ancestors. Returns -1 if there is no limit.
|
||||
int64_t get_lowest_limit() const;
|
||||
|
||||
typedef std::function<void(int64_t bytes_to_free)> GcFunction;
|
||||
/// Add a function 'f' to be called if the limit is reached, if none of the other
|
||||
/// previously-added GC functions were successful at freeing up enough memory.
|
||||
/// 'f' does not need to be thread-safe as long as it is added to only one MemTrackerLimiter.
|
||||
/// Note that 'f' must be valid for the lifetime of this MemTrackerLimiter.
|
||||
void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
|
||||
|
||||
// If consumption is higher than max_consumption, attempts to free memory by calling
|
||||
// any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock.
|
||||
// Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment,
|
||||
// the performance of subsequent queries may be degraded, so the use of gc function should be careful enough.
|
||||
bool gc_memory(int64_t max_consumption);
|
||||
Status try_gc_memory(int64_t bytes);
|
||||
|
||||
/// Logs the usage of this tracker and optionally its children (recursively).
|
||||
/// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
|
||||
/// 'max_recursive_depth' specifies the maximum number of levels of children
|
||||
/// to include in the dump. If it is zero, then no children are dumped.
|
||||
/// Limiting the recursive depth reduces the cost of dumping, particularly
|
||||
/// for the process MemTracker.
|
||||
std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr);
|
||||
|
||||
// Log the memory usage when memory limit is exceeded and return a status object with
|
||||
// details of the allocation which caused the limit to be exceeded.
|
||||
// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
|
||||
// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
|
||||
// If 'state' is non-nullptr, logs the error to 'state'.
|
||||
Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(),
|
||||
int64_t failed_allocation = -1, Status failed_alloc = Status::OK());
|
||||
|
||||
std::string debug_string() {
|
||||
std::stringstream msg;
|
||||
msg << "limit: " << _limit << "; "
|
||||
<< "consumption: " << _consumption->current_value() << "; "
|
||||
<< "label: " << _label << "; "
|
||||
<< "all tracker size: " << _ancestor_all_trackers.size() << "; "
|
||||
<< "limit trackers size: " << _ancestor_limiter_trackers.size() << "; "
|
||||
<< "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
|
||||
return msg.str();
|
||||
}
|
||||
|
||||
private:
|
||||
// The following func, for automatic memory tracking and limiting based on system memory allocation.
|
||||
friend class ThreadMemTrackerMgr;
|
||||
|
||||
MemTrackerLimiter(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
|
||||
: MemTrackerBase(label, parent, profile) {}
|
||||
|
||||
// Creates the process tracker.
|
||||
static void create_process_tracker();
|
||||
|
||||
// Increases consumption of this tracker and its ancestors by 'bytes'.
|
||||
void consume(int64_t bytes);
|
||||
|
||||
// Decreases consumption of this tracker and its ancestors by 'bytes'.
|
||||
void release(int64_t bytes) { consume(-bytes); }
|
||||
|
||||
// Increases consumption of this tracker and its ancestors by 'bytes' only if
|
||||
// they can all consume 'bytes' without exceeding limit. If limit would be exceed,
|
||||
// no MemTrackers are updated. Returns true if the consumption was successfully updated.
|
||||
WARN_UNUSED_RESULT
|
||||
Status try_consume(int64_t bytes);
|
||||
|
||||
/// Log consumption of all the trackers provided. Returns the sum of consumption in
|
||||
/// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels
|
||||
/// of children to include in the dump. If it is zero, then no children are dumped.
|
||||
static std::string log_usage(int max_recursive_depth,
|
||||
const std::list<MemTrackerLimiter*>& trackers,
|
||||
int64_t* logged_consumption);
|
||||
|
||||
private:
|
||||
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
|
||||
int64_t _limit;
|
||||
|
||||
// Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
|
||||
// to avoid frequent calls to consume/release of MemTracker.
|
||||
std::atomic<int64_t> _untracked_mem = 0;
|
||||
|
||||
// All the child trackers of this tracker. Used for error reporting and
|
||||
// listing only (i.e. updating the consumption of a parent tracker does not
|
||||
// update that of its children).
|
||||
SpinLock _child_trackers_lock;
|
||||
std::list<MemTrackerLimiter*> _child_limiter_trackers;
|
||||
std::list<MemTrackerObserve*> _child_observe_trackers;
|
||||
// Iterator into parent_->_child_limiter_trackers for this object. Stored to have O(1) remove.
|
||||
std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
|
||||
|
||||
// this tracker plus all of its ancestors
|
||||
std::vector<MemTrackerLimiter*> _ancestor_all_trackers;
|
||||
// _ancestor_all_trackers with valid limits
|
||||
std::vector<MemTrackerLimiter*> _ancestor_limiter_trackers;
|
||||
|
||||
// Lock to protect gc_memory(). This prevents many GCs from occurring at once.
|
||||
std::mutex _gc_lock;
|
||||
// Functions to call after the limit is reached to free memory.
|
||||
std::vector<GcFunction> _gc_functions;
|
||||
};
|
||||
|
||||
inline void MemTrackerLimiter::consume(int64_t bytes) {
|
||||
if (bytes == 0) {
|
||||
return;
|
||||
} else {
|
||||
for (auto& tracker : _ancestor_all_trackers) {
|
||||
tracker->_consumption->add(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
|
||||
if (bytes <= 0) {
|
||||
release(-bytes);
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(check_sys_mem_info(bytes));
|
||||
int i;
|
||||
// Walk the tracker tree top-down.
|
||||
for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
|
||||
MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
|
||||
if (tracker->limit() < 0) {
|
||||
tracker->_consumption->add(bytes); // No limit at this tracker.
|
||||
} else {
|
||||
// If TryConsume fails, we can try to GC, but we may need to try several times if
|
||||
// there are concurrent consumers because we don't take a lock before trying to
|
||||
// update _consumption.
|
||||
while (true) {
|
||||
if (LIKELY(tracker->_consumption->try_add(bytes, tracker->limit()))) break;
|
||||
Status st = tracker->try_gc_memory(bytes);
|
||||
if (!st) {
|
||||
// Failed for this mem tracker. Roll back the ones that succeeded.
|
||||
for (int j = _ancestor_all_trackers.size() - 1; j > i; --j) {
|
||||
_ancestor_all_trackers[j]->_consumption->add(-bytes);
|
||||
}
|
||||
return st;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Everyone succeeded, return.
|
||||
DCHECK_EQ(i, -1);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
inline void MemTrackerLimiter::consume_self(int64_t bytes) {
|
||||
int64_t consume_bytes = add_untracked_mem(bytes);
|
||||
if (consume_bytes != 0) {
|
||||
_consumption->add(consume_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
|
||||
DCHECK(end_tracker);
|
||||
if (bytes == 0) return;
|
||||
for (auto& tracker : _ancestor_all_trackers) {
|
||||
if (tracker == end_tracker) return;
|
||||
tracker->consume_self(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
inline void MemTrackerLimiter::transfer_to(MemTrackerLimiter* dst, int64_t bytes) {
|
||||
DCHECK(dst->is_limited());
|
||||
if (id() == dst->id()) return;
|
||||
release_local(bytes, MemTrackerLimiter::get_process_tracker());
|
||||
dst->consume_local(bytes, MemTrackerLimiter::get_process_tracker());
|
||||
}
|
||||
|
||||
inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
|
||||
_untracked_mem += bytes;
|
||||
if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
|
||||
return _untracked_mem.exchange(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
|
||||
if (bytes <= 0) return Status::OK();
|
||||
RETURN_IF_ERROR(check_sys_mem_info(bytes));
|
||||
int i;
|
||||
// Walk the tracker tree top-down.
|
||||
for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
|
||||
MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
|
||||
if (tracker->limit() > 0) {
|
||||
while (true) {
|
||||
if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit()))
|
||||
break;
|
||||
RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__);
|
||||
#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \
|
||||
if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg);
|
||||
#define RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, msg) \
|
||||
if (state->instance_mem_tracker()->any_limit_exceeded()) \
|
||||
RETURN_LIMIT_EXCEEDED(state->instance_mem_tracker(), state, msg);
|
||||
|
||||
} // namespace doris
|
||||
87
be/src/runtime/memory/mem_tracker_observe.cpp
Normal file
87
be/src/runtime/memory/mem_tracker_observe.cpp
Normal file
@ -0,0 +1,87 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/memory/mem_tracker_observe.h"
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <parallel_hashmap/phmap.h>
|
||||
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/pretty_printer.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
using TemporaryTrackersMap = phmap::parallel_flat_hash_map<
|
||||
std::string, MemTrackerObserve*, phmap::priv::hash_default_hash<std::string>,
|
||||
phmap::priv::hash_default_eq<std::string>,
|
||||
std::allocator<std::pair<const std::string, MemTrackerObserve*>>, 12, std::mutex>;
|
||||
|
||||
static TemporaryTrackersMap _temporary_mem_trackers;
|
||||
|
||||
MemTrackerObserve* MemTrackerObserve::create_tracker(const std::string& label,
|
||||
RuntimeProfile* profile) {
|
||||
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
|
||||
MemTrackerLimiter* parent = tls_ctx()->_thread_mem_tracker_mgr->limiter_mem_tracker();
|
||||
DCHECK(parent);
|
||||
std::string parent_label = parent->label();
|
||||
std::string reset_label;
|
||||
if (parent_label.find_first_of("#") != parent_label.npos) {
|
||||
reset_label = fmt::format("[Observe]-{}#{}", label,
|
||||
parent_label.substr(parent_label.find_first_of("#"), -1));
|
||||
} else {
|
||||
reset_label = fmt::format("[Observe]-{}", label);
|
||||
}
|
||||
MemTrackerObserve* tracker(new MemTrackerObserve(reset_label, parent, profile));
|
||||
parent->add_child_tracker(tracker);
|
||||
tracker->set_is_observed();
|
||||
return tracker;
|
||||
}
|
||||
|
||||
MemTrackerObserve::~MemTrackerObserve() {
|
||||
if (parent()) {
|
||||
parent()->remove_child_tracker(this);
|
||||
}
|
||||
}
|
||||
|
||||
// Count the memory in the scope to a temporary tracker with the specified label name.
|
||||
// This is very useful when debugging. You can find the position where the tracker statistics are
|
||||
// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots.
|
||||
// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding
|
||||
// a switch temporary tracker to each line. Maybe there are open source tools to do this?
|
||||
MemTrackerObserve* MemTrackerObserve::get_temporary_mem_tracker(const std::string& label) {
|
||||
// First time this label registered, make a new object, otherwise do nothing.
|
||||
// Avoid using locks to resolve erase conflicts.
|
||||
_temporary_mem_trackers.try_emplace_l(
|
||||
label, [](MemTrackerObserve*) {},
|
||||
MemTrackerObserve::create_tracker(fmt::format("[Temporary]-{}", label)));
|
||||
return _temporary_mem_trackers[label];
|
||||
}
|
||||
|
||||
std::string MemTrackerObserve::log_usage(int64_t* logged_consumption) {
|
||||
// Make sure the consumption is up to date.
|
||||
int64_t curr_consumption = consumption();
|
||||
int64_t peak_consumption = _consumption->value();
|
||||
if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
|
||||
if (curr_consumption == 0) return "";
|
||||
std::string detail = "MemTracker log_usage Label: {}, Total: {}, Peak: {}";
|
||||
detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES),
|
||||
PrettyPrinter::print(peak_consumption, TUnit::BYTES));
|
||||
return detail;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
91
be/src/runtime/memory/mem_tracker_observe.h
Normal file
91
be/src/runtime/memory/mem_tracker_observe.h
Normal file
@ -0,0 +1,91 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "runtime/memory/mem_tracker_base.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class MemTrackerLimiter;
|
||||
|
||||
// Used to manually track memory usage at specified locations, including all exec node trackers.
|
||||
//
|
||||
// There is no parent-child relationship between MemTrackerObserves. Both fathers are fragment instance trakcers,
|
||||
// but their consumption will not consume fragment instance trakcers synchronously. Therefore, errors in statistics
|
||||
// will not affect the memory tracking and restrictions of processes and Query.
|
||||
class MemTrackerObserve final : public MemTrackerBase {
|
||||
public:
|
||||
// Creates and adds the tracker to the tree
|
||||
static MemTrackerObserve* create_tracker(const std::string& label,
|
||||
RuntimeProfile* profile = nullptr);
|
||||
|
||||
~MemTrackerObserve();
|
||||
|
||||
// Get a temporary tracker with a specified label, and the tracker will be created when the label is first get.
|
||||
// Temporary trackers are not automatically destructed, which is usually used for debugging.
|
||||
static MemTrackerObserve* get_temporary_mem_tracker(const std::string& label);
|
||||
|
||||
public:
|
||||
void consume(int64_t bytes);
|
||||
|
||||
void release(int64_t bytes) { consume(-bytes); }
|
||||
|
||||
static void batch_consume(int64_t bytes, const std::vector<MemTrackerObserve*>& trackers) {
|
||||
for (auto& tracker : trackers) {
|
||||
tracker->consume(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
// Transfer 'bytes' of consumption from this tracker to 'dst'.
|
||||
void transfer_to(MemTrackerObserve* dst, int64_t bytes);
|
||||
|
||||
bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); }
|
||||
|
||||
std::string log_usage(int64_t* logged_consumption = nullptr);
|
||||
|
||||
std::string debug_string() {
|
||||
std::stringstream msg;
|
||||
msg << "label: " << _label << "; "
|
||||
<< "consumption: " << _consumption->current_value() << "; "
|
||||
<< "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
|
||||
return msg.str();
|
||||
}
|
||||
|
||||
// Iterator into parent_->_child_observe_trackers for this object. Stored to have O(1) remove.
|
||||
std::list<MemTrackerObserve*>::iterator _child_tracker_it;
|
||||
|
||||
private:
|
||||
MemTrackerObserve(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
|
||||
: MemTrackerBase(label, parent, profile) {}
|
||||
};
|
||||
|
||||
inline void MemTrackerObserve::consume(int64_t bytes) {
|
||||
if (bytes == 0) {
|
||||
return;
|
||||
} else {
|
||||
_consumption->add(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
inline void MemTrackerObserve::transfer_to(MemTrackerObserve* dst, int64_t bytes) {
|
||||
if (id() == dst->id()) return;
|
||||
release(bytes);
|
||||
dst->consume(bytes);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
153
be/src/runtime/memory/mem_tracker_task_pool.cpp
Normal file
153
be/src/runtime/memory/mem_tracker_task_pool.cpp
Normal file
@ -0,0 +1,153 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/memory/mem_tracker_task_pool.h"
|
||||
|
||||
#include "common/config.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "util/pretty_printer.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
|
||||
int64_t mem_limit,
|
||||
const std::string& label,
|
||||
MemTrackerLimiter* parent) {
|
||||
DCHECK(!task_id.empty());
|
||||
// First time this task_id registered, make a new object, otherwise do nothing.
|
||||
// Combine create_tracker and emplace into one operation to avoid the use of locks
|
||||
// Name for task MemTrackers. '$0' is replaced with the task id.
|
||||
_task_mem_trackers.try_emplace_l(
|
||||
task_id, [](MemTrackerLimiter*) {},
|
||||
MemTrackerLimiter::create_tracker(mem_limit, label, parent));
|
||||
return get_task_mem_tracker(task_id);
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
|
||||
int64_t mem_limit) {
|
||||
VLOG_FILE << "Register Query memory tracker, query id: " << query_id
|
||||
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
|
||||
return register_task_mem_tracker_impl(query_id, mem_limit,
|
||||
fmt::format("Query#queryId={}", query_id),
|
||||
ExecEnv::GetInstance()->query_pool_mem_tracker());
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id,
|
||||
int64_t mem_limit) {
|
||||
// In load, the query id of the fragment is executed, which is the same as the load id of the load channel.
|
||||
VLOG_FILE << "Register Load memory tracker, load id: " << load_id
|
||||
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
|
||||
return register_task_mem_tracker_impl(load_id, mem_limit,
|
||||
fmt::format("Load#loadId={}", load_id),
|
||||
ExecEnv::GetInstance()->load_pool_mem_tracker());
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) {
|
||||
DCHECK(!task_id.empty());
|
||||
MemTrackerLimiter* tracker = nullptr;
|
||||
// Avoid using locks to resolve erase conflicts
|
||||
_task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; });
|
||||
return tracker;
|
||||
}
|
||||
|
||||
void MemTrackerTaskPool::logout_task_mem_tracker() {
|
||||
std::vector<std::string> expired_tasks;
|
||||
for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
|
||||
if (!it->second) {
|
||||
// https://github.com/apache/incubator-doris/issues/10006
|
||||
expired_tasks.emplace_back(it->first);
|
||||
} else if (it->second->is_leaf() == true && it->second->peak_consumption() > 0) {
|
||||
// No RuntimeState uses this task MemTracker, it is only referenced by this map,
|
||||
// and tracker was not created soon, delete it.
|
||||
if (config::memory_leak_detection && it->second->consumption() != 0) {
|
||||
// If consumption is not equal to 0 before query mem tracker is destructed,
|
||||
// there are two possibilities in theory.
|
||||
// 1. A memory leak occurs.
|
||||
// 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on
|
||||
// other trackers such as the process mem tracker, and there is no manual transfer between the two trackers.
|
||||
//
|
||||
// The second case should be eliminated in theory, but it has not been done so far, so the query memory leak
|
||||
// cannot be located, and the value of the query pool mem tracker statistics will be inaccurate.
|
||||
LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string();
|
||||
}
|
||||
// In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
|
||||
// the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
|
||||
// the negative number of the current value of consume.
|
||||
it->second->parent()->consume_local(-it->second->consumption(),
|
||||
MemTrackerLimiter::get_process_tracker());
|
||||
expired_tasks.emplace_back(it->first);
|
||||
} else {
|
||||
// Log limit exceeded query tracker.
|
||||
if (it->second->limit_exceeded()) {
|
||||
it->second->mem_limit_exceeded(
|
||||
nullptr,
|
||||
fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
|
||||
0, Status::OK());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto tid : expired_tasks) {
|
||||
if (!_task_mem_trackers[tid]) {
|
||||
_task_mem_trackers.erase(tid);
|
||||
VLOG_FILE << "Deregister null task mem tracker, task id: " << tid;
|
||||
} else {
|
||||
delete _task_mem_trackers[tid];
|
||||
_task_mem_trackers.erase(tid);
|
||||
VLOG_FILE << "Deregister not used task mem tracker, task id: " << tid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(zxy) More observable methods
|
||||
// /// Logs the usage of 'limit' number of queries based on maximum total memory
|
||||
// /// consumption.
|
||||
// std::string MemTracker::LogTopNQueries(int limit) {
|
||||
// if (limit == 0) return "";
|
||||
// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
|
||||
// std::greater<pair<int64_t, string>>>
|
||||
// min_pq;
|
||||
// GetTopNQueries(min_pq, limit);
|
||||
// std::vector<string> usage_strings(min_pq.size());
|
||||
// while (!min_pq.empty()) {
|
||||
// usage_strings.push_back(min_pq.top().second);
|
||||
// min_pq.pop();
|
||||
// }
|
||||
// std::reverse(usage_strings.begin(), usage_strings.end());
|
||||
// return join(usage_strings, "\n");
|
||||
// }
|
||||
|
||||
// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy
|
||||
// /// and populates 'min_pq' with 'limit' number of elements (that contain state related
|
||||
// /// to query MemTrackers) based on maximum total memory consumption.
|
||||
// void MemTracker::GetTopNQueries(
|
||||
// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
|
||||
// greater<pair<int64_t, string>>>& min_pq,
|
||||
// int limit) {
|
||||
// list<weak_ptr<MemTracker>> children;
|
||||
// {
|
||||
// lock_guard<SpinLock> l(child_trackers_lock_);
|
||||
// children = child_trackers_;
|
||||
// }
|
||||
// for (const auto& child_weak : children) {
|
||||
// shared_ptr<MemTracker> child = child_weak.lock();
|
||||
// if (child) {
|
||||
// child->GetTopNQueries(min_pq, limit);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
} // namespace doris
|
||||
58
be/src/runtime/memory/mem_tracker_task_pool.h
Normal file
58
be/src/runtime/memory/mem_tracker_task_pool.h
Normal file
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <parallel_hashmap/phmap.h>
|
||||
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
using TaskTrackersMap = phmap::parallel_flat_hash_map<
|
||||
std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>,
|
||||
phmap::priv::hash_default_eq<std::string>,
|
||||
std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>;
|
||||
|
||||
// Global task pool for query MemTrackers. Owned by ExecEnv.
|
||||
class MemTrackerTaskPool {
|
||||
public:
|
||||
// Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit.
|
||||
// The MemTracker is a child of the pool MemTracker, Calling this with the same
|
||||
// 'task_id' will return the same MemTracker object. This is used to track the local
|
||||
// memory usage of all tasks executing. The first time this is called for a task,
|
||||
// a new MemTracker object is created with the pool tracker as its parent.
|
||||
// Newly created trackers will always have a limit of -1.
|
||||
MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit,
|
||||
const std::string& label,
|
||||
MemTrackerLimiter* parent);
|
||||
MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit);
|
||||
MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit);
|
||||
|
||||
MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
|
||||
|
||||
// Remove the mem tracker that has ended the query.
|
||||
void logout_task_mem_tracker();
|
||||
|
||||
private:
|
||||
// All per-task MemTracker objects.
|
||||
// The life cycle of task memtracker in the process is the same as task runtime state,
|
||||
// MemTrackers will be removed from this map after query finish or cancel.
|
||||
TaskTrackersMap _task_mem_trackers;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
Reference in New Issue
Block a user