fix mem tracker limiter (#11376)
This commit is contained in:
@ -63,7 +63,7 @@ Status BaseCompaction::execute_compact_impl() {
|
||||
return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::COMPACTION);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
|
||||
|
||||
// 2. do base compaction, merge rowsets
|
||||
int64_t permits = get_compaction_permits();
|
||||
|
||||
@ -34,10 +34,10 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
|
||||
_input_row_num(0),
|
||||
_state(CompactionState::INITED) {
|
||||
#ifndef BE_TEST
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, label, StorageEngine::instance()->compaction_mem_tracker());
|
||||
#else
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, label);
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -76,7 +76,7 @@ protected:
|
||||
|
||||
protected:
|
||||
// the root tracker for this compaction
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
TabletSharedPtr _tablet;
|
||||
|
||||
|
||||
@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() {
|
||||
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::COMPACTION);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
|
||||
|
||||
// 3. do cumulative compaction, merge rowsets
|
||||
int64_t permits = get_compaction_permits();
|
||||
|
||||
@ -30,14 +30,14 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, MemTrackerLimiter* parent_tracker,
|
||||
bool is_vec) {
|
||||
Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) {
|
||||
*writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
|
||||
MemTrackerLimiter* parent_tracker, bool is_vec)
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec)
|
||||
: _req(*req),
|
||||
_tablet(nullptr),
|
||||
_cur_rowset(nullptr),
|
||||
@ -98,9 +98,9 @@ Status DeltaWriter::init() {
|
||||
<< ", schema_hash=" << _req.schema_hash;
|
||||
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
|
||||
}
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
|
||||
// check tablet version number
|
||||
if (_tablet->version_count() > config::max_tablet_version_num) {
|
||||
//trigger quick compaction
|
||||
@ -208,7 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
|
||||
_mem_table->insert(block, row_idxs);
|
||||
|
||||
if (_mem_table->need_to_agg()) {
|
||||
@ -226,7 +226,7 @@ Status DeltaWriter::_flush_memtable_async() {
|
||||
if (++_segment_counter > config::max_segment_num_per_rowset) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
|
||||
}
|
||||
return _flush_token->submit(std::move(_mem_table), _mem_tracker.get());
|
||||
return _flush_token->submit(std::move(_mem_table), _mem_tracker);
|
||||
}
|
||||
|
||||
Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
|
||||
@ -243,7 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
|
||||
if (mem_consumption() == _mem_table->memory_usage()) {
|
||||
// equal means there is no memtable in flush queue, just flush this memtable
|
||||
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
|
||||
@ -297,7 +297,7 @@ Status DeltaWriter::close() {
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
|
||||
RETURN_NOT_OK(_flush_memtable_async());
|
||||
_mem_table.reset();
|
||||
return Status::OK();
|
||||
@ -312,7 +312,7 @@ Status DeltaWriter::close_wait() {
|
||||
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
|
||||
}
|
||||
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
|
||||
// return error if previous flush failed
|
||||
RETURN_NOT_OK(_flush_token->wait());
|
||||
|
||||
|
||||
@ -56,7 +56,9 @@ struct WriteRequest {
|
||||
class DeltaWriter {
|
||||
public:
|
||||
static Status open(WriteRequest* req, DeltaWriter** writer,
|
||||
MemTrackerLimiter* parent_tracker = nullptr, bool is_vec = false);
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker =
|
||||
std::shared_ptr<MemTrackerLimiter>(),
|
||||
bool is_vec = false);
|
||||
|
||||
~DeltaWriter();
|
||||
|
||||
@ -101,8 +103,8 @@ public:
|
||||
int64_t get_mem_consumption_snapshot() const;
|
||||
|
||||
private:
|
||||
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, MemTrackerLimiter* parent_tracker,
|
||||
bool is_vec);
|
||||
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec);
|
||||
|
||||
// push a full memtable to flush executor
|
||||
Status _flush_memtable_async();
|
||||
@ -133,8 +135,8 @@ private:
|
||||
|
||||
StorageEngine* _storage_engine;
|
||||
std::unique_ptr<FlushToken> _flush_token;
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
MemTrackerLimiter* _parent_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _parent_tracker;
|
||||
|
||||
// The counter of number of segment flushed already.
|
||||
int64_t _segment_counter = 0;
|
||||
|
||||
@ -29,7 +29,7 @@ namespace doris {
|
||||
class MemtableFlushTask final : public Runnable {
|
||||
public:
|
||||
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
|
||||
int64_t submit_task_time, MemTrackerLimiter* tracker)
|
||||
int64_t submit_task_time, const std::shared_ptr<MemTrackerLimiter>& tracker)
|
||||
: _flush_token(flush_token),
|
||||
_memtable(std::move(memtable)),
|
||||
_submit_task_time(submit_task_time),
|
||||
@ -47,7 +47,7 @@ private:
|
||||
FlushToken* _flush_token;
|
||||
std::unique_ptr<MemTable> _memtable;
|
||||
int64_t _submit_task_time;
|
||||
MemTrackerLimiter* _tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _tracker;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
|
||||
@ -58,7 +58,8 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
|
||||
return os;
|
||||
}
|
||||
|
||||
Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker) {
|
||||
Status FlushToken::submit(std::unique_ptr<MemTable> mem_table,
|
||||
const std::shared_ptr<MemTrackerLimiter>& tracker) {
|
||||
ErrorCode s = _flush_status.load();
|
||||
if (s != OLAP_SUCCESS) {
|
||||
return Status::OLAPInternalError(s);
|
||||
|
||||
@ -57,7 +57,8 @@ public:
|
||||
explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
|
||||
: _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {}
|
||||
|
||||
Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker);
|
||||
Status submit(std::unique_ptr<MemTable> mem_table,
|
||||
const std::shared_ptr<MemTrackerLimiter>& tracker);
|
||||
|
||||
// error has happpens, so we cancel this token
|
||||
// And remove all tasks in the queue.
|
||||
|
||||
@ -112,7 +112,7 @@ Status StorageEngine::start_bg_threads() {
|
||||
RETURN_IF_ERROR(Thread::create(
|
||||
"StorageEngine", "path_scan_thread",
|
||||
[this, data_dir]() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
this->_path_scan_thread_callback(data_dir);
|
||||
},
|
||||
&path_scan_thread));
|
||||
@ -122,7 +122,7 @@ Status StorageEngine::start_bg_threads() {
|
||||
RETURN_IF_ERROR(Thread::create(
|
||||
"StorageEngine", "path_gc_thread",
|
||||
[this, data_dir]() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
this->_path_gc_thread_callback(data_dir);
|
||||
},
|
||||
&path_gc_thread));
|
||||
|
||||
@ -113,16 +113,16 @@ StorageEngine::StorageEngine(const EngineOptions& options)
|
||||
_index_stream_lru_cache(nullptr),
|
||||
_file_cache(nullptr),
|
||||
_compaction_mem_tracker(
|
||||
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")),
|
||||
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")),
|
||||
_segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")),
|
||||
_schema_change_mem_tracker(
|
||||
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::SchemaChange")),
|
||||
_clone_mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Clone")),
|
||||
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::SchemaChange")),
|
||||
_clone_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Clone")),
|
||||
_batch_load_mem_tracker(
|
||||
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")),
|
||||
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")),
|
||||
_consistency_mem_tracker(
|
||||
std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Consistency")),
|
||||
_mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Self")),
|
||||
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Consistency")),
|
||||
_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Self")),
|
||||
_stop_background_threads_latch(1),
|
||||
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
|
||||
_txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
|
||||
@ -168,7 +168,7 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
|
||||
std::vector<std::thread> threads;
|
||||
for (auto data_dir : data_dirs) {
|
||||
threads.emplace_back([this, data_dir] {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
auto res = data_dir->load();
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "io error when init load tables. res=" << res
|
||||
@ -220,7 +220,7 @@ Status StorageEngine::_init_store_map() {
|
||||
_tablet_manager.get(), _txn_manager.get());
|
||||
tmp_stores.emplace_back(store);
|
||||
threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
auto st = store->init();
|
||||
if (!st.ok()) {
|
||||
{
|
||||
|
||||
@ -179,12 +179,16 @@ public:
|
||||
|
||||
Status get_compaction_status_json(std::string* result);
|
||||
|
||||
MemTrackerLimiter* compaction_mem_tracker() { return _compaction_mem_tracker.get(); }
|
||||
std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return _compaction_mem_tracker; }
|
||||
MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); }
|
||||
MemTrackerLimiter* schema_change_mem_tracker() { return _schema_change_mem_tracker.get(); }
|
||||
MemTrackerLimiter* clone_mem_tracker() { return _clone_mem_tracker.get(); }
|
||||
MemTrackerLimiter* batch_load_mem_tracker() { return _batch_load_mem_tracker.get(); }
|
||||
MemTrackerLimiter* consistency_mem_tracker() { return _consistency_mem_tracker.get(); }
|
||||
std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() {
|
||||
return _schema_change_mem_tracker;
|
||||
}
|
||||
std::shared_ptr<MemTrackerLimiter> clone_mem_tracker() { return _clone_mem_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> batch_load_mem_tracker() { return _batch_load_mem_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> consistency_mem_tracker() {
|
||||
return _consistency_mem_tracker;
|
||||
}
|
||||
|
||||
// check cumulative compaction config
|
||||
void check_cumulative_compaction_config();
|
||||
@ -333,21 +337,21 @@ private:
|
||||
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
|
||||
|
||||
// Count the memory consumption of all Base and Cumulative tasks.
|
||||
std::unique_ptr<MemTrackerLimiter> _compaction_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker;
|
||||
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
|
||||
// The memory consumed by querying is tracked in segment iterator.
|
||||
std::unique_ptr<MemTracker> _segment_meta_mem_tracker;
|
||||
// Count the memory consumption of all SchemaChange tasks.
|
||||
std::unique_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
|
||||
// Count the memory consumption of all EngineCloneTask.
|
||||
// Note: Memory that does not contain make/release snapshots.
|
||||
std::unique_ptr<MemTrackerLimiter> _clone_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _clone_mem_tracker;
|
||||
// Count the memory consumption of all EngineBatchLoadTask.
|
||||
std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
|
||||
// Count the memory consumption of all EngineChecksumTask.
|
||||
std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _consistency_mem_tracker;
|
||||
// StorageEngine oneself
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
CountDownLatch _stop_background_threads_latch;
|
||||
scoped_refptr<Thread> _unused_rowset_monitor_thread;
|
||||
|
||||
@ -25,7 +25,7 @@ namespace doris {
|
||||
|
||||
EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
|
||||
: _alter_tablet_req(request) {
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
config::memory_limitation_per_thread_for_schema_change_bytes,
|
||||
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
|
||||
std::to_string(_alter_tablet_req.base_tablet_id),
|
||||
@ -34,7 +34,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
|
||||
}
|
||||
|
||||
Status EngineAlterTabletTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
|
||||
|
||||
Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
private:
|
||||
const TAlterTabletReqV2& _alter_tablet_req;
|
||||
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -53,7 +53,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
|
||||
_signature(signature),
|
||||
_res_status(res_status) {
|
||||
_download_status = Status::OK();
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1,
|
||||
fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type,
|
||||
std::to_string(_push_req.tablet_id)),
|
||||
@ -63,7 +63,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
|
||||
EngineBatchLoadTask::~EngineBatchLoadTask() {}
|
||||
|
||||
Status EngineBatchLoadTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
Status status = Status::OK();
|
||||
if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) {
|
||||
status = _init();
|
||||
|
||||
@ -76,7 +76,7 @@ private:
|
||||
Status* _res_status;
|
||||
std::string _remote_file_path;
|
||||
std::string _local_file_path;
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // class EngineBatchLoadTask
|
||||
} // namespace doris
|
||||
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
|
||||
|
||||
@ -26,13 +26,13 @@ namespace doris {
|
||||
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash,
|
||||
TVersion version, uint32_t* checksum)
|
||||
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
|
||||
StorageEngine::instance()->consistency_mem_tracker());
|
||||
}
|
||||
|
||||
Status EngineChecksumTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
return _compute_checksum();
|
||||
} // execute
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ private:
|
||||
TSchemaHash _schema_hash;
|
||||
TVersion _version;
|
||||
uint32_t* _checksum;
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -57,14 +57,14 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
|
||||
_res_status(res_status),
|
||||
_signature(signature),
|
||||
_master_info(master_info) {
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id),
|
||||
StorageEngine::instance()->clone_mem_tracker());
|
||||
}
|
||||
|
||||
Status EngineCloneTask::execute() {
|
||||
// register the tablet to avoid it is deleted by gc thread during clone process
|
||||
SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
|
||||
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
|
||||
Status st = _do_clone();
|
||||
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
|
||||
|
||||
@ -79,7 +79,7 @@ private:
|
||||
const TMasterInfo& _master_info;
|
||||
int64_t _copy_size;
|
||||
int64_t _copy_time_ms;
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
|
||||
DCHECK(profile != nullptr);
|
||||
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
|
||||
<< ", node=" << dest_node_id;
|
||||
shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
|
||||
this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id,
|
||||
num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
|
||||
shared_ptr<DataStreamRecvr> recvr(
|
||||
new DataStreamRecvr(this, row_desc, fragment_instance_id, dest_node_id, num_senders,
|
||||
is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
|
||||
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
|
||||
lock_guard<mutex> l(_lock);
|
||||
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
|
||||
|
||||
@ -447,9 +447,8 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
|
||||
|
||||
DataStreamRecvr::DataStreamRecvr(
|
||||
DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
|
||||
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
|
||||
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
|
||||
RuntimeProfile* profile,
|
||||
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
|
||||
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
|
||||
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
|
||||
: _mgr(stream_mgr),
|
||||
_fragment_instance_id(fragment_instance_id),
|
||||
|
||||
@ -116,9 +116,8 @@ private:
|
||||
class SenderQueue;
|
||||
|
||||
DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
|
||||
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
|
||||
PlanNodeId dest_node_id, int num_senders, bool is_merging,
|
||||
int total_buffer_limit, RuntimeProfile* profile,
|
||||
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
|
||||
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
|
||||
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr);
|
||||
|
||||
// If receive queue is full, done is enqueue pending, and return with *done is nullptr
|
||||
|
||||
@ -113,10 +113,12 @@ public:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
MemTrackerLimiter* process_mem_tracker() { return _process_mem_tracker; }
|
||||
void set_process_mem_tracker(MemTrackerLimiter* tracker) { _process_mem_tracker = tracker; }
|
||||
MemTrackerLimiter* query_pool_mem_tracker() { return _query_pool_mem_tracker; }
|
||||
MemTrackerLimiter* load_pool_mem_tracker() { return _load_pool_mem_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return _process_mem_tracker; }
|
||||
void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
|
||||
_process_mem_tracker = tracker;
|
||||
}
|
||||
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
|
||||
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
|
||||
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
|
||||
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
|
||||
@ -184,11 +186,11 @@ private:
|
||||
|
||||
// The ancestor for all trackers. Every tracker is visible from the process down.
|
||||
// Not limit total memory by process tracker, and it's just used to track virtual memory of process.
|
||||
MemTrackerLimiter* _process_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
|
||||
// The ancestor for all querys tracker.
|
||||
MemTrackerLimiter* _query_pool_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
|
||||
// The ancestor for all load tracker.
|
||||
MemTrackerLimiter* _load_pool_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _load_pool_mem_tracker;
|
||||
MemTrackerTaskPool* _task_pool_mem_tracker_registry;
|
||||
|
||||
// The following two thread pools are used in different scenarios.
|
||||
|
||||
@ -193,7 +193,8 @@ Status ExecEnv::_init_mem_tracker() {
|
||||
<< ". Using physical memory instead";
|
||||
global_memory_limit_bytes = MemInfo::physical_mem();
|
||||
}
|
||||
_process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, "Process");
|
||||
_process_mem_tracker =
|
||||
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
|
||||
thread_context()->_thread_mem_tracker_mgr->init();
|
||||
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
|
||||
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
|
||||
@ -203,10 +204,12 @@ Status ExecEnv::_init_mem_tracker() {
|
||||
}
|
||||
#endif
|
||||
|
||||
_query_pool_mem_tracker = new MemTrackerLimiter(-1, "QueryPool", _process_mem_tracker);
|
||||
_query_pool_mem_tracker =
|
||||
std::make_shared<MemTrackerLimiter>(-1, "QueryPool", _process_mem_tracker);
|
||||
REGISTER_HOOK_METRIC(query_mem_consumption,
|
||||
[this]() { return _query_pool_mem_tracker->consumption(); });
|
||||
_load_pool_mem_tracker = new MemTrackerLimiter(-1, "LoadPool", _process_mem_tracker);
|
||||
_load_pool_mem_tracker =
|
||||
std::make_shared<MemTrackerLimiter>(-1, "LoadPool", _process_mem_tracker);
|
||||
REGISTER_HOOK_METRIC(load_mem_consumption,
|
||||
[this]() { return _load_pool_mem_tracker->consumption(); });
|
||||
LOG(INFO) << "Using global memory limit: "
|
||||
@ -363,9 +366,6 @@ void ExecEnv::_destroy() {
|
||||
SAFE_DELETE(_routine_load_task_executor);
|
||||
SAFE_DELETE(_external_scan_context_mgr);
|
||||
SAFE_DELETE(_heartbeat_flags);
|
||||
SAFE_DELETE(_process_mem_tracker);
|
||||
SAFE_DELETE(_query_pool_mem_tracker);
|
||||
SAFE_DELETE(_load_pool_mem_tracker);
|
||||
SAFE_DELETE(_task_pool_mem_tracker_registry);
|
||||
SAFE_DELETE(_buffer_reservation);
|
||||
|
||||
|
||||
@ -25,11 +25,11 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter> mem_tracker,
|
||||
LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
|
||||
bool is_vec)
|
||||
: _load_id(load_id),
|
||||
_mem_tracker(std::move(mem_tracker)),
|
||||
_mem_tracker(mem_tracker),
|
||||
_timeout_s(timeout_s),
|
||||
_is_high_priority(is_high_priority),
|
||||
_sender_ip(sender_ip),
|
||||
@ -60,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
|
||||
} else {
|
||||
// create a new tablets channel
|
||||
TabletsChannelKey key(params.id(), index_id);
|
||||
channel.reset(new TabletsChannel(key, _mem_tracker.get(), _is_high_priority, _is_vec));
|
||||
channel.reset(new TabletsChannel(key, _mem_tracker, _is_high_priority, _is_vec));
|
||||
_tablets_channels.insert({index_id, channel});
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ class Cache;
|
||||
// corresponding to a certain load job
|
||||
class LoadChannel {
|
||||
public:
|
||||
LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter> mem_tracker,
|
||||
LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
|
||||
bool is_vec);
|
||||
~LoadChannel();
|
||||
@ -99,7 +99,7 @@ private:
|
||||
|
||||
UniqueId _load_id;
|
||||
// Tracks the total memory consumed by current load job on this BE
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
// lock protect the tablets channel map
|
||||
std::mutex _lock;
|
||||
|
||||
@ -84,7 +84,7 @@ LoadChannelMgr::~LoadChannelMgr() {
|
||||
|
||||
Status LoadChannelMgr::init(int64_t process_mem_limit) {
|
||||
int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit);
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(load_mgr_mem_limit, "LoadChannelMgr");
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit, "LoadChannelMgr");
|
||||
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
|
||||
[this]() { return _mem_tracker->consumption(); });
|
||||
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
|
||||
@ -110,13 +110,13 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
|
||||
int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
|
||||
int64_t channel_mem_limit =
|
||||
calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
|
||||
auto channel_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
channel_mem_limit,
|
||||
fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
|
||||
load_id.to_string()),
|
||||
_mem_tracker.get());
|
||||
channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker),
|
||||
channel_timeout_s, is_high_priority, params.sender_ip(),
|
||||
_mem_tracker);
|
||||
channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s,
|
||||
is_high_priority, params.sender_ip(),
|
||||
params.is_vectorized()));
|
||||
_load_channels.insert({load_id, channel});
|
||||
}
|
||||
|
||||
@ -78,7 +78,7 @@ protected:
|
||||
Cache* _last_success_channel = nullptr;
|
||||
|
||||
// check the total load channel mem consumption of this Backend
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
CountDownLatch _stop_background_threads_latch;
|
||||
// thread to clean timeout load channels
|
||||
|
||||
@ -19,6 +19,8 @@
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <boost/stacktrace.hpp>
|
||||
|
||||
#include "gutil/once.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_context.h"
|
||||
@ -29,7 +31,8 @@
|
||||
namespace doris {
|
||||
|
||||
MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label,
|
||||
MemTrackerLimiter* parent, RuntimeProfile* profile)
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent,
|
||||
RuntimeProfile* profile)
|
||||
: MemTracker(label, profile, true) {
|
||||
DCHECK_GE(byte_limit, -1);
|
||||
_limit = byte_limit;
|
||||
@ -42,32 +45,28 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
|
||||
while (tracker != nullptr) {
|
||||
_all_ancestors.push_back(tracker);
|
||||
if (tracker->has_limit()) _limited_ancestors.push_back(tracker);
|
||||
tracker = tracker->_parent;
|
||||
tracker = tracker->_parent.get();
|
||||
}
|
||||
DCHECK_GT(_all_ancestors.size(), 0);
|
||||
DCHECK_EQ(_all_ancestors[0], this);
|
||||
if (_parent) _parent->add_child(this);
|
||||
if (_parent) {
|
||||
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
|
||||
_child_tracker_it = _parent->_child_tracker_limiters.insert(
|
||||
_parent->_child_tracker_limiters.end(), this);
|
||||
_had_child_count++;
|
||||
}
|
||||
}
|
||||
|
||||
MemTrackerLimiter::~MemTrackerLimiter() {
|
||||
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
|
||||
if (_label == "Process") doris::thread_context_ptr._init = false;
|
||||
DCHECK(remain_child_count() == 0 || _label == "Process");
|
||||
if (_parent) _parent->remove_child(this);
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) {
|
||||
std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
|
||||
tracker->_child_tracker_it =
|
||||
_child_tracker_limiters.insert(_child_tracker_limiters.end(), tracker);
|
||||
_had_child_count++;
|
||||
}
|
||||
|
||||
void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
|
||||
std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
|
||||
if (tracker->_child_tracker_it != _child_tracker_limiters.end()) {
|
||||
_child_tracker_limiters.erase(tracker->_child_tracker_it);
|
||||
tracker->_child_tracker_it = _child_tracker_limiters.end();
|
||||
if (_parent) {
|
||||
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
|
||||
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
|
||||
_parent->_child_tracker_limiters.erase(_child_tracker_it);
|
||||
_child_tracker_it = _parent->_child_tracker_limiters.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,13 +220,14 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
|
||||
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details,
|
||||
int64_t failed_allocation_size, Status failed_alloc) {
|
||||
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
|
||||
MemTrackerLimiter* process_tracker = ExecEnv::GetInstance()->process_mem_tracker();
|
||||
std::string detail =
|
||||
"Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process "
|
||||
"limit={}.";
|
||||
detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "",
|
||||
details, BackendOptions::get_localhost(),
|
||||
PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
|
||||
detail = fmt::format(
|
||||
detail, state != nullptr ? print_id(state->fragment_instance_id()) : "", details,
|
||||
BackendOptions::get_localhost(),
|
||||
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
|
||||
TUnit::BYTES));
|
||||
if (!failed_alloc) {
|
||||
detail += " failed alloc=<{}>. current tracker={}.";
|
||||
detail = fmt::format(detail, failed_alloc.to_string(), _label);
|
||||
@ -240,13 +240,15 @@ Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::str
|
||||
Status status = Status::MemoryLimitExceeded(detail);
|
||||
if (state != nullptr) state->log_error(detail);
|
||||
|
||||
detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace());
|
||||
// only print the tracker log_usage in be log.
|
||||
if (process_tracker->spare_capacity() < failed_allocation_size) {
|
||||
if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() < failed_allocation_size) {
|
||||
// Dumping the process MemTracker is expensive. Limiting the recursive depth to two
|
||||
// levels limits the level of detail to a one-line summary for each query MemTracker.
|
||||
detail += "\n" + process_tracker->log_usage(2);
|
||||
detail += "\n" + ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2);
|
||||
} else {
|
||||
detail += "\n" + log_usage();
|
||||
}
|
||||
detail += "\n" + log_usage();
|
||||
|
||||
LOG(WARNING) << detail;
|
||||
return status;
|
||||
|
||||
@ -42,8 +42,10 @@ class RuntimeState;
|
||||
class MemTrackerLimiter final : public MemTracker {
|
||||
public:
|
||||
// Creates and adds the tracker limiter to the tree
|
||||
MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = std::string(),
|
||||
MemTrackerLimiter* parent = nullptr, RuntimeProfile* profile = nullptr);
|
||||
MemTrackerLimiter(
|
||||
int64_t byte_limit = -1, const std::string& label = std::string(),
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent = std::shared_ptr<MemTrackerLimiter>(),
|
||||
RuntimeProfile* profile = nullptr);
|
||||
|
||||
// If the final consumption is not as expected, this usually means that the same memory is calling
|
||||
// consume and release on different trackers. If the two trackers have a parent-child relationship,
|
||||
@ -51,10 +53,7 @@ public:
|
||||
// no parent-child relationship, the two tracker consumptions are wrong.
|
||||
~MemTrackerLimiter();
|
||||
|
||||
MemTrackerLimiter* parent() const { return _parent; }
|
||||
|
||||
void add_child(MemTrackerLimiter* tracker);
|
||||
void remove_child(MemTrackerLimiter* tracker);
|
||||
std::shared_ptr<MemTrackerLimiter> parent() const { return _parent; }
|
||||
|
||||
size_t remain_child_count() const { return _child_tracker_limiters.size(); }
|
||||
size_t had_child_count() const { return _had_child_count; }
|
||||
@ -187,7 +186,7 @@ private:
|
||||
// Group number in MemTracker::mem_tracker_pool, generated by the timestamp.
|
||||
int64_t _group_num;
|
||||
|
||||
MemTrackerLimiter* _parent; // The parent of this tracker.
|
||||
std::shared_ptr<MemTrackerLimiter> _parent; // The parent of this tracker.
|
||||
|
||||
// this tracker limiter plus all of its ancestors
|
||||
std::vector<MemTrackerLimiter*> _all_ancestors;
|
||||
@ -199,13 +198,12 @@ private:
|
||||
// update that of its children).
|
||||
mutable std::mutex _child_tracker_limiter_lock;
|
||||
std::list<MemTrackerLimiter*> _child_tracker_limiters;
|
||||
// Iterator into parent_->_child_tracker_limiters for this object. Stored to have O(1) remove.
|
||||
std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
|
||||
|
||||
// The number of child trackers that have been added.
|
||||
std::atomic_size_t _had_child_count = 0;
|
||||
|
||||
// Iterator into parent_->_child_tracker_limiters for this object. Stored to have O(1) remove.
|
||||
std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
|
||||
|
||||
// Lock to protect gc_memory(). This prevents many GCs from occurring at once.
|
||||
std::mutex _gc_lock;
|
||||
// Functions to call after the limit is reached to free memory.
|
||||
|
||||
@ -23,47 +23,49 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
|
||||
int64_t mem_limit,
|
||||
const std::string& label,
|
||||
MemTrackerLimiter* parent) {
|
||||
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker_impl(
|
||||
const std::string& task_id, int64_t mem_limit, const std::string& label,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent) {
|
||||
DCHECK(!task_id.empty());
|
||||
// First time this task_id registered, make a new object, otherwise do nothing.
|
||||
// Combine new tracker and emplace into one operation to avoid the use of locks
|
||||
// Name for task MemTrackers. '$0' is replaced with the task id.
|
||||
std::shared_ptr<MemTrackerLimiter> tracker;
|
||||
bool new_emplace = _task_mem_trackers.lazy_emplace_l(
|
||||
task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
|
||||
task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; },
|
||||
[&](const auto& ctor) {
|
||||
ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, label, parent));
|
||||
tracker = std::make_shared<MemTrackerLimiter>(mem_limit, label, parent);
|
||||
ctor(task_id, tracker);
|
||||
});
|
||||
if (new_emplace) {
|
||||
LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id
|
||||
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
|
||||
}
|
||||
return _task_mem_trackers[task_id].get();
|
||||
return tracker;
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
|
||||
int64_t mem_limit) {
|
||||
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_mem_tracker(
|
||||
const std::string& query_id, int64_t mem_limit) {
|
||||
return register_task_mem_tracker_impl(query_id, mem_limit,
|
||||
fmt::format("Query#queryId={}", query_id),
|
||||
ExecEnv::GetInstance()->query_pool_mem_tracker());
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id,
|
||||
int64_t mem_limit) {
|
||||
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker(
|
||||
const std::string& load_id, int64_t mem_limit) {
|
||||
// In load, the query id of the fragment is executed, which is the same as the load id of the load channel.
|
||||
return register_task_mem_tracker_impl(load_id, mem_limit,
|
||||
fmt::format("Load#queryId={}", load_id),
|
||||
ExecEnv::GetInstance()->load_pool_mem_tracker());
|
||||
}
|
||||
|
||||
MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) {
|
||||
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
|
||||
const std::string& task_id) {
|
||||
DCHECK(!task_id.empty());
|
||||
MemTrackerLimiter* tracker = nullptr;
|
||||
std::shared_ptr<MemTrackerLimiter> tracker = nullptr;
|
||||
// Avoid using locks to resolve erase conflicts
|
||||
_task_mem_trackers.if_contains(
|
||||
task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { tracker = v.get(); });
|
||||
task_id, [&tracker](const std::shared_ptr<MemTrackerLimiter>& v) { tracker = v; });
|
||||
return tracker;
|
||||
}
|
||||
|
||||
@ -74,8 +76,8 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
|
||||
// Unknown exception case with high concurrency, after _task_mem_trackers.erase,
|
||||
// the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006
|
||||
expired_task_ids.emplace_back(it->first);
|
||||
} else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) {
|
||||
// No RuntimeState uses this task MemTracker, it is only referenced by this map,
|
||||
} else if (it->second.use_count() == 1 && it->second->had_child_count() != 0) {
|
||||
// No RuntimeState uses this task MemTrackerLimiter, it is only referenced by this map,
|
||||
// and tracker was not created soon, delete it.
|
||||
//
|
||||
// If consumption is not equal to 0 before query mem tracker is destructed,
|
||||
@ -92,20 +94,12 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
|
||||
it->second->parent()->consumption_revise(-it->second->consumption());
|
||||
LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
|
||||
expired_task_ids.emplace_back(it->first);
|
||||
} else {
|
||||
// Log limit exceeded query tracker.
|
||||
if (it->second->limit_exceeded()) {
|
||||
it->second->mem_limit_exceeded(
|
||||
nullptr,
|
||||
fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
|
||||
0, Status::OK());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto tid : expired_task_ids) {
|
||||
// Verify the condition again to make sure the tracker is not being used again.
|
||||
_task_mem_trackers.erase_if(tid, [&](std::shared_ptr<MemTrackerLimiter> v) {
|
||||
return !v || v->remain_child_count() == 0;
|
||||
_task_mem_trackers.erase_if(tid, [&](const std::shared_ptr<MemTrackerLimiter>& v) {
|
||||
return !v || v.use_count() == 1;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,26 +32,28 @@ using TaskTrackersMap = phmap::parallel_flat_hash_map<
|
||||
// Global task pool for query MemTrackers. Owned by ExecEnv.
|
||||
class MemTrackerTaskPool {
|
||||
public:
|
||||
// Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit.
|
||||
// The MemTracker is a child of the pool MemTracker, Calling this with the same
|
||||
// 'task_id' will return the same MemTracker object. This is used to track the local
|
||||
// Construct a MemTrackerLimiter object for 'task_id' with 'mem_limit' as the memory limit.
|
||||
// The MemTrackerLimiter is a child of the pool MemTrackerLimiter, Calling this with the same
|
||||
// 'task_id' will return the same MemTrackerLimiter object. This is used to track the local
|
||||
// memory usage of all tasks executing. The first time this is called for a task,
|
||||
// a new MemTracker object is created with the pool tracker as its parent.
|
||||
// a new MemTrackerLimiter object is created with the pool tracker as its parent.
|
||||
// Newly created trackers will always have a limit of -1.
|
||||
MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit,
|
||||
const std::string& label,
|
||||
MemTrackerLimiter* parent);
|
||||
MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit);
|
||||
MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit);
|
||||
std::shared_ptr<MemTrackerLimiter> register_task_mem_tracker_impl(
|
||||
const std::string& task_id, int64_t mem_limit, const std::string& label,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent);
|
||||
std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const std::string& query_id,
|
||||
int64_t mem_limit);
|
||||
std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const std::string& load_id,
|
||||
int64_t mem_limit);
|
||||
|
||||
MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
|
||||
std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string& task_id);
|
||||
|
||||
// Remove the mem tracker that has ended the query.
|
||||
void logout_task_mem_tracker();
|
||||
|
||||
private:
|
||||
// All per-task MemTracker objects.
|
||||
// The life cycle of task memtracker in the process is the same as task runtime state,
|
||||
// All per-task MemTrackerLimiter objects.
|
||||
// The life cycle of task MemTrackerLimiter in the process is the same as task runtime state,
|
||||
// MemTrackers will be removed from this map after query finish or cancel.
|
||||
TaskTrackersMap _task_mem_trackers;
|
||||
};
|
||||
|
||||
@ -24,10 +24,10 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
|
||||
const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id,
|
||||
MemTrackerLimiter* mem_tracker) {
|
||||
void ThreadMemTrackerMgr::attach_limiter_tracker(
|
||||
const std::string& cancel_msg, const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
DCHECK(mem_tracker);
|
||||
flush_untracked_mem<false>();
|
||||
_task_id = task_id;
|
||||
@ -37,8 +37,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
|
||||
}
|
||||
|
||||
void ThreadMemTrackerMgr::detach_limiter_tracker() {
|
||||
// Do not flush untracked mem, instance executor thread may exit after instance fragment executor thread,
|
||||
// `instance_mem_tracker` will be null pointer, which is not a graceful exit.
|
||||
flush_untracked_mem<false>();
|
||||
_task_id = "";
|
||||
_fragment_instance_id = TUniqueId();
|
||||
_exceed_cb.cancel_msg = "";
|
||||
|
||||
@ -77,7 +77,7 @@ public:
|
||||
// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
|
||||
void attach_limiter_tracker(const std::string& cancel_msg, const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id,
|
||||
MemTrackerLimiter* mem_tracker);
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
|
||||
|
||||
void detach_limiter_tracker();
|
||||
|
||||
@ -116,7 +116,7 @@ public:
|
||||
|
||||
bool is_attach_task() { return _task_id != ""; }
|
||||
|
||||
MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; }
|
||||
|
||||
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
|
||||
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
|
||||
@ -145,7 +145,7 @@ private:
|
||||
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
|
||||
int64_t _untracked_mem = 0;
|
||||
|
||||
MemTrackerLimiter* _limiter_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
|
||||
std::vector<MemTracker*> _consumer_tracker_stack;
|
||||
|
||||
// If true, call memtracker try_consume, otherwise call consume.
|
||||
|
||||
@ -241,9 +241,10 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
|
||||
print_id(query_id), bytes_limit);
|
||||
} else {
|
||||
DCHECK(false);
|
||||
_query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
|
||||
}
|
||||
|
||||
_instance_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
bytes_limit, "RuntimeState:instance:" + print_id(_fragment_instance_id),
|
||||
_query_mem_tracker, &_profile);
|
||||
|
||||
@ -263,7 +264,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
|
||||
|
||||
Status RuntimeState::init_instance_mem_tracker() {
|
||||
_query_mem_tracker = nullptr;
|
||||
_instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, "RuntimeState:instance");
|
||||
_instance_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "RuntimeState:instance");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -127,8 +127,8 @@ public:
|
||||
const TUniqueId& query_id() const { return _query_id; }
|
||||
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
|
||||
ExecEnv* exec_env() { return _exec_env; }
|
||||
MemTrackerLimiter* query_mem_tracker() { return _query_mem_tracker; }
|
||||
MemTrackerLimiter* instance_mem_tracker() { return _instance_mem_tracker.get(); }
|
||||
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; }
|
||||
std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return _instance_mem_tracker; }
|
||||
ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
|
||||
|
||||
void set_fragment_root_id(PlanNodeId id) {
|
||||
@ -390,10 +390,10 @@ private:
|
||||
|
||||
// MemTracker that is shared by all fragment instances running on this host.
|
||||
// The query mem tracker must be released after the _instance_mem_tracker.
|
||||
MemTrackerLimiter* _query_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
|
||||
|
||||
// Memory usage of this fragment instance
|
||||
std::unique_ptr<MemTrackerLimiter> _instance_mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker;
|
||||
|
||||
// put runtime state before _obj_pool, so that it will be deconstructed after
|
||||
// _obj_pool. Because some of object in _obj_pool will use profile when deconstructing.
|
||||
|
||||
@ -182,7 +182,7 @@ private:
|
||||
// signal of new batch or the eos/cancelled condition
|
||||
std::condition_variable _batch_prepared_cv;
|
||||
|
||||
void process_sorted_run_task(MemTrackerLimiter* mem_tracker) {
|
||||
void process_sorted_run_task(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
SCOPED_ATTACH_TASK(mem_tracker, ThreadContext::TaskType::QUERY);
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
while (true) {
|
||||
|
||||
@ -30,14 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
|
||||
|
||||
std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
|
||||
|
||||
TabletsChannel::TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
|
||||
TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker,
|
||||
bool is_high_priority, bool is_vec)
|
||||
: _key(key),
|
||||
_state(kInitialized),
|
||||
_closed_senders(64),
|
||||
_is_high_priority(is_high_priority),
|
||||
_is_vec(is_vec) {
|
||||
_mem_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
_mem_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker);
|
||||
static std::once_flag once_flag;
|
||||
std::call_once(once_flag, [] {
|
||||
@ -240,7 +241,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
|
||||
wrequest.ptable_schema_param = request.schema();
|
||||
|
||||
DeltaWriter* writer = nullptr;
|
||||
auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), _is_vec);
|
||||
auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec);
|
||||
if (!st.ok()) {
|
||||
std::stringstream ss;
|
||||
ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
|
||||
|
||||
@ -60,8 +60,9 @@ class OlapTableSchemaParam;
|
||||
// Write channel for a particular (load, index).
|
||||
class TabletsChannel {
|
||||
public:
|
||||
TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
|
||||
bool is_high_priority, bool is_vec);
|
||||
TabletsChannel(const TabletsChannelKey& key,
|
||||
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_high_priority,
|
||||
bool is_vec);
|
||||
|
||||
~TabletsChannel();
|
||||
|
||||
@ -144,7 +145,7 @@ private:
|
||||
|
||||
static std::atomic<uint64_t> _s_tablet_writer_count;
|
||||
|
||||
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
bool _is_high_priority = false;
|
||||
|
||||
|
||||
@ -29,8 +29,9 @@ ThreadContextPtr::ThreadContextPtr() {
|
||||
_init = true;
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const ThreadContext::TaskType& type,
|
||||
const std::string& task_id, const TUniqueId& fragment_instance_id) {
|
||||
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const ThreadContext::TaskType& type, const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id) {
|
||||
DCHECK(mem_tracker);
|
||||
#ifdef USE_MEM_TRACKER
|
||||
thread_context()->attach_task(type, task_id, fragment_instance_id, mem_tracker);
|
||||
|
||||
@ -130,7 +130,8 @@ public:
|
||||
}
|
||||
|
||||
void attach_task(const TaskType& type, const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id, MemTrackerLimiter* mem_tracker) {
|
||||
const TUniqueId& fragment_instance_id,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "")
|
||||
<< ",new tracker label: " << mem_tracker->label() << ",old tracker label: "
|
||||
<< _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
|
||||
@ -195,7 +196,7 @@ static ThreadContext* thread_context() {
|
||||
|
||||
class AttachTask {
|
||||
public:
|
||||
explicit AttachTask(MemTrackerLimiter* mem_tracker,
|
||||
explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const ThreadContext::TaskType& type = ThreadContext::TaskType::UNKNOWN,
|
||||
const std::string& task_id = "",
|
||||
const TUniqueId& fragment_instance_id = TUniqueId());
|
||||
|
||||
@ -118,20 +118,20 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
|
||||
const Status& extract_st) {
|
||||
std::string query_id;
|
||||
TUniqueId finst_id;
|
||||
std::unique_ptr<MemTrackerLimiter> transmit_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> transmit_tracker;
|
||||
if (request->has_query_id()) {
|
||||
query_id = print_id(request->query_id());
|
||||
finst_id.__set_hi(request->finst_id().hi());
|
||||
finst_id.__set_lo(request->finst_id().lo());
|
||||
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
|
||||
transmit_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
transmit_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, fmt::format("QueryTransmit#queryId={}", query_id),
|
||||
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
|
||||
} else {
|
||||
query_id = "unkown_transmit_data";
|
||||
transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, "unkown_transmit_data");
|
||||
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
|
||||
}
|
||||
SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, query_id, finst_id);
|
||||
SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id);
|
||||
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
|
||||
<< " query_id=" << query_id << " node=" << request->node_id();
|
||||
// The response is accessed when done->Run is called in transmit_data(),
|
||||
@ -649,20 +649,20 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
|
||||
const Status& extract_st) {
|
||||
std::string query_id;
|
||||
TUniqueId finst_id;
|
||||
std::unique_ptr<MemTrackerLimiter> transmit_tracker;
|
||||
std::shared_ptr<MemTrackerLimiter> transmit_tracker;
|
||||
if (request->has_query_id()) {
|
||||
query_id = print_id(request->query_id());
|
||||
finst_id.__set_hi(request->finst_id().hi());
|
||||
finst_id.__set_lo(request->finst_id().lo());
|
||||
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
|
||||
transmit_tracker = std::make_unique<MemTrackerLimiter>(
|
||||
transmit_tracker = std::make_shared<MemTrackerLimiter>(
|
||||
-1, fmt::format("QueryTransmit#queryId={}", query_id),
|
||||
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
|
||||
} else {
|
||||
query_id = "unkown_transmit_block";
|
||||
transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, "unkown_transmit_block");
|
||||
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
|
||||
}
|
||||
SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, query_id, finst_id);
|
||||
SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id);
|
||||
VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
|
||||
<< " query_id=" << query_id << " node=" << request->node_id();
|
||||
// The response is accessed when done->Run is called in transmit_block(),
|
||||
|
||||
@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
|
||||
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
|
||||
<< ", node=" << dest_node_id;
|
||||
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
|
||||
this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id,
|
||||
num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
|
||||
this, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging,
|
||||
buffer_size, profile, sub_plan_query_statistics_recvr));
|
||||
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
|
||||
|
||||
@ -249,9 +249,8 @@ void VDataStreamRecvr::SenderQueue::close() {
|
||||
|
||||
VDataStreamRecvr::VDataStreamRecvr(
|
||||
VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
|
||||
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
|
||||
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
|
||||
RuntimeProfile* profile,
|
||||
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
|
||||
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
|
||||
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
|
||||
: _mgr(stream_mgr),
|
||||
_fragment_instance_id(fragment_instance_id),
|
||||
|
||||
@ -51,9 +51,9 @@ class VExprContext;
|
||||
class VDataStreamRecvr {
|
||||
public:
|
||||
VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
|
||||
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
|
||||
PlanNodeId dest_node_id, int num_senders, bool is_merging,
|
||||
int total_buffer_limit, RuntimeProfile* profile,
|
||||
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
|
||||
int num_senders, bool is_merging, int total_buffer_limit,
|
||||
RuntimeProfile* profile,
|
||||
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr);
|
||||
|
||||
~VDataStreamRecvr();
|
||||
|
||||
@ -53,9 +53,9 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
|
||||
}
|
||||
|
||||
TEST(MemTestTest, TrackerHierarchy) {
|
||||
auto p = std::make_unique<MemTrackerLimiter>(100);
|
||||
auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
|
||||
auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
|
||||
auto p = std::make_shared<MemTrackerLimiter>(100);
|
||||
auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
|
||||
auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
|
||||
|
||||
// everything below limits
|
||||
c1->consume(60);
|
||||
@ -96,9 +96,9 @@ TEST(MemTestTest, TrackerHierarchy) {
|
||||
}
|
||||
|
||||
TEST(MemTestTest, TrackerHierarchyTryConsume) {
|
||||
auto p = std::make_unique<MemTrackerLimiter>(100);
|
||||
auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
|
||||
auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
|
||||
auto p = std::make_shared<MemTrackerLimiter>(100);
|
||||
auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
|
||||
auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
|
||||
|
||||
// everything below limits
|
||||
bool consumption = c1->try_consume(60).ok();
|
||||
|
||||
@ -29,7 +29,8 @@
|
||||
#include "util/mem_info.h"
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
doris::MemTrackerLimiter* process_mem_tracker = new doris::MemTrackerLimiter(-1, "Process");
|
||||
std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
|
||||
std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
|
||||
doris::ExecEnv::GetInstance()->set_process_mem_tracker(process_mem_tracker);
|
||||
doris::thread_context()->_thread_mem_tracker_mgr->init();
|
||||
doris::StoragePageCache::create_global_cache(1 << 30, 10);
|
||||
|
||||
Reference in New Issue
Block a user