diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index df9167484b..5c0d7dc2d2 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -529,27 +529,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) { return Status::OK(); } -void DeltaWriter::save_mem_consumption_snapshot() { - std::lock_guard l(_lock); - _mem_consumption_snapshot = mem_consumption(MemType::ALL); - if (_mem_table == nullptr) { - _memtable_consumption_snapshot = 0; - } else { - _memtable_consumption_snapshot = _mem_table->memory_usage(); - } -} - -int64_t DeltaWriter::get_memtable_consumption_inflush() const { - if (!_is_init || _flush_token->get_stats().flush_running_count == 0) { - return 0; - } - return _mem_consumption_snapshot - _memtable_consumption_snapshot; -} - -int64_t DeltaWriter::get_memtable_consumption_snapshot() const { - return _memtable_consumption_snapshot; -} - int64_t DeltaWriter::mem_consumption(MemType mem) { if (_flush_token == nullptr) { // This method may be called before this writer is initialized. @@ -573,6 +552,23 @@ int64_t DeltaWriter::mem_consumption(MemType mem) { return mem_usage; } +int64_t DeltaWriter::active_memtable_mem_consumption() { + if (_flush_token == nullptr) { + // This method may be called before this writer is initialized. + // So _flush_token may be null. + return 0; + } + int64_t mem_usage = 0; + { + std::lock_guard l(_mem_table_tracker_lock); + if (_mem_table_insert_trackers.size() > 0) { + mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption(); + mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption(); + } + } + return mem_usage; +} + int64_t DeltaWriter::partition_id() const { return _req.partition_id; } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index b91144751c..52b407876f 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -116,6 +116,7 @@ public: int64_t partition_id() const; int64_t mem_consumption(MemType mem); + int64_t active_memtable_mem_consumption(); // Wait all memtable in flush queue to be flushed Status wait_flush(); @@ -124,12 +125,6 @@ public: int32_t schema_hash() { return _tablet->schema_hash(); } - void save_mem_consumption_snapshot(); - - int64_t get_memtable_consumption_inflush() const; - - int64_t get_memtable_consumption_snapshot() const; - void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); int64_t total_received_rows() const { return _total_received_rows; } @@ -181,13 +176,6 @@ private: std::mutex _lock; - // memory consumption snapshot for current delta_writer, only - // used for std::sort - int64_t _mem_consumption_snapshot = 0; - // memory consumption snapshot for current memtable, only - // used for std::sort - int64_t _memtable_consumption_snapshot = 0; - std::unordered_set _unfinished_slave_node; PSuccessSlaveTabletNodeIds _success_slave_node_ids; std::shared_mutex _slave_node_lock; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index e746014fde..5648cb0ae1 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -87,13 +87,13 @@ public: return mem_usage; } - void get_writers_mem_consumption_snapshot( + void get_active_memtable_mem_consumption( std::vector>>>* writers_mem_snap) { std::lock_guard l(_tablets_channels_lock); for (auto& it : _tablets_channels) { std::multimap> tablets_channel_mem; - it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem); + it.second->get_active_memtable_mem_consumption(&tablets_channel_mem); writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem)); } } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 1a9fd86f2d..a9d94aedb9 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -360,12 +360,12 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { // do not select high priority channel to reduce memory - // to avoid blocking them. + // to avoid blocking the. continue; } std::vector>>> writers_mem_snap; - kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap); + kv.second->get_active_memtable_mem_consumption(&writers_mem_snap); for (auto item : writers_mem_snap) { // multimap is empty if (item.second.empty()) { @@ -412,8 +412,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { << " delta writers (total mem: " << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: " << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front())) + << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front()) << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back())) - << "), "; + << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << "), "; if (proc_mem_no_allocator_cache < process_soft_mem_limit) { oss << "because total load mem consumption " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index b7ac5dcd64..05b17c5931 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -282,6 +282,16 @@ int64_t TabletsChannel::mem_consumption() { return write_mem_usage + flush_mem_usage; } +void TabletsChannel::get_active_memtable_mem_consumption( + std::multimap>* mem_consumptions) { + mem_consumptions->clear(); + std::lock_guard l(_tablet_writers_lock); + for (auto& it : _tablet_writers) { + int64_t active_memtable_mem = it.second->active_memtable_mem_consumption(); + mem_consumptions->emplace(active_memtable_mem, it.first); + } +} + // Old logic,used for opening all writers of all partitions. Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { std::vector* index_slots = nullptr; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 58cfd961ff..b9235925b0 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -115,11 +115,8 @@ public: int64_t mem_consumption(); - void get_writers_mem_consumption_snapshot( - std::multimap>* mem_consumptions) { - std::lock_guard l(_tablet_writers_lock); - *mem_consumptions = _mem_consumptions; - } + void get_active_memtable_mem_consumption( + std::multimap>* mem_consumptions); void flush_memtable_async(int64_t tablet_id); void wait_flush(int64_t tablet_id);