[fix](load) fix memtracking orphan too large (#28600)
This commit is contained in:
@ -586,6 +586,8 @@ DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
|
||||
// Alignment
|
||||
DEFINE_Int32(memory_max_alignment, "16");
|
||||
|
||||
// memtable insert memory tracker will multiply input block size with this ratio
|
||||
DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
|
||||
// max write buffer size before flush, default 200MB
|
||||
DEFINE_mInt64(write_buffer_size, "209715200");
|
||||
// max buffer size used in memtable for the aggregated table, default 400MB
|
||||
|
||||
@ -645,6 +645,8 @@ DECLARE_mInt32(memtable_soft_limit_active_percent);
|
||||
// Alignment
|
||||
DECLARE_Int32(memory_max_alignment);
|
||||
|
||||
// memtable insert memory tracker will multiply input block size with this ratio
|
||||
DECLARE_mDouble(memtable_insert_memory_ratio);
|
||||
// max write buffer size before flush, default 200MB
|
||||
DECLARE_mInt64(write_buffer_size);
|
||||
// max buffer size used in memtable for the aggregated table, default 400MB
|
||||
|
||||
@ -44,6 +44,7 @@
|
||||
namespace doris {
|
||||
|
||||
bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt");
|
||||
bvar::Adder<int64_t> g_memtable_input_block_allocated_size("memtable_input_block_allocated_size");
|
||||
|
||||
using namespace ErrorCode;
|
||||
|
||||
@ -137,6 +138,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
|
||||
}
|
||||
|
||||
MemTable::~MemTable() {
|
||||
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
|
||||
g_memtable_cnt << -1;
|
||||
if (_keys_type != KeysType::DUP_KEYS) {
|
||||
for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) {
|
||||
@ -198,6 +200,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui
|
||||
|
||||
auto num_rows = row_idxs.size();
|
||||
size_t cursor_in_mutableblock = _input_mutable_block.rows();
|
||||
auto block_size0 = _input_mutable_block.allocated_bytes();
|
||||
if (is_append) {
|
||||
// Append the block, call insert range from
|
||||
_input_mutable_block.add_rows(&target_block, 0, target_block.rows());
|
||||
@ -205,7 +208,10 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui
|
||||
} else {
|
||||
_input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows);
|
||||
}
|
||||
size_t input_size = target_block.bytes() * num_rows / target_block.rows();
|
||||
auto block_size1 = _input_mutable_block.allocated_bytes();
|
||||
g_memtable_input_block_allocated_size << block_size1 - block_size0;
|
||||
size_t input_size = target_block.bytes() * num_rows / target_block.rows() *
|
||||
config::memtable_insert_memory_ratio;
|
||||
_mem_usage += input_size;
|
||||
_insert_mem_tracker->consume(input_size);
|
||||
for (int i = 0; i < num_rows; i++) {
|
||||
@ -504,6 +510,9 @@ std::unique_ptr<vectorized::Block> MemTable::to_block() {
|
||||
!_tablet_schema->cluster_key_idxes().empty()) {
|
||||
_sort_by_cluster_keys();
|
||||
}
|
||||
_input_mutable_block.clear();
|
||||
_insert_mem_tracker->release(_mem_usage);
|
||||
_mem_usage = 0;
|
||||
return vectorized::Block::create_unique(_output_mutable_block.to_block());
|
||||
}
|
||||
|
||||
|
||||
@ -38,6 +38,7 @@ bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
|
||||
bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
|
||||
bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
|
||||
bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
|
||||
bvar::Status<int64_t> g_orphan_memory("mm_limiter_mem_orphan", 0);
|
||||
|
||||
// Calculate the total memory limit of all load tasks on this BE
|
||||
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
|
||||
@ -236,6 +237,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
|
||||
g_memtable_load_memory.set_value(_mem_usage);
|
||||
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
|
||||
THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), _mem_tracker.get());
|
||||
g_orphan_memory.set_value(ExecEnv::GetInstance()->orphan_mem_tracker()->consumption());
|
||||
if (!_hard_limit_reached()) {
|
||||
_hard_limit_end_cond.notify_all();
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
#include "runtime/tablets_channel.h"
|
||||
|
||||
#include <bvar/bvar.h>
|
||||
#include <fmt/format.h>
|
||||
#include <gen_cpp/internal_service.pb.h>
|
||||
#include <gen_cpp/types.pb.h>
|
||||
@ -41,6 +42,7 @@
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/txn_manager.h"
|
||||
#include "runtime/load_channel.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/metrics.h"
|
||||
#include "vec/core/block.h"
|
||||
@ -48,6 +50,9 @@
|
||||
namespace doris {
|
||||
class SlotDescriptor;
|
||||
|
||||
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
|
||||
"tablets_channel_send_data_allocated_size");
|
||||
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
|
||||
|
||||
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
|
||||
@ -521,6 +526,10 @@ Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request
|
||||
<< "block rows: " << send_data.rows()
|
||||
<< ", tablet_ids_size: " << request.tablet_ids_size();
|
||||
|
||||
g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
|
||||
Defer defer {
|
||||
[&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
|
||||
|
||||
auto write_tablet_data = [&](uint32_t tablet_id,
|
||||
std::function<Status(BaseDeltaWriter * writer)> write_func) {
|
||||
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
|
||||
|
||||
Reference in New Issue
Block a user