[improvement](load) avoid producing small segment (#20852)

avoid producing small segment
This commit is contained in:
Yongqiang YANG
2023-06-19 18:34:44 +08:00
committed by GitHub
parent 415f1053a4
commit a3342cb088
6 changed files with 36 additions and 44 deletions

View File

@ -529,27 +529,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
return Status::OK();
}
void DeltaWriter::save_mem_consumption_snapshot() {
std::lock_guard<std::mutex> l(_lock);
_mem_consumption_snapshot = mem_consumption(MemType::ALL);
if (_mem_table == nullptr) {
_memtable_consumption_snapshot = 0;
} else {
_memtable_consumption_snapshot = _mem_table->memory_usage();
}
}
int64_t DeltaWriter::get_memtable_consumption_inflush() const {
if (!_is_init || _flush_token->get_stats().flush_running_count == 0) {
return 0;
}
return _mem_consumption_snapshot - _memtable_consumption_snapshot;
}
int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
return _memtable_consumption_snapshot;
}
int64_t DeltaWriter::mem_consumption(MemType mem) {
if (_flush_token == nullptr) {
// This method may be called before this writer is initialized.
@ -573,6 +552,23 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
return mem_usage;
}
int64_t DeltaWriter::active_memtable_mem_consumption() {
if (_flush_token == nullptr) {
// This method may be called before this writer is initialized.
// So _flush_token may be null.
return 0;
}
int64_t mem_usage = 0;
{
std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
if (_mem_table_insert_trackers.size() > 0) {
mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption();
mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption();
}
}
return mem_usage;
}
int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}

View File

@ -116,6 +116,7 @@ public:
int64_t partition_id() const;
int64_t mem_consumption(MemType mem);
int64_t active_memtable_mem_consumption();
// Wait all memtable in flush queue to be flushed
Status wait_flush();
@ -124,12 +125,6 @@ public:
int32_t schema_hash() { return _tablet->schema_hash(); }
void save_mem_consumption_snapshot();
int64_t get_memtable_consumption_inflush() const;
int64_t get_memtable_consumption_snapshot() const;
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
int64_t total_received_rows() const { return _total_received_rows; }
@ -181,13 +176,6 @@ private:
std::mutex _lock;
// memory consumption snapshot for current delta_writer, only
// used for std::sort
int64_t _mem_consumption_snapshot = 0;
// memory consumption snapshot for current memtable, only
// used for std::sort
int64_t _memtable_consumption_snapshot = 0;
std::unordered_set<int64_t> _unfinished_slave_node;
PSuccessSlaveTabletNodeIds _success_slave_node_ids;
std::shared_mutex _slave_node_lock;

View File

@ -87,13 +87,13 @@ public:
return mem_usage;
}
void get_writers_mem_consumption_snapshot(
void get_active_memtable_mem_consumption(
std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>*
writers_mem_snap) {
std::lock_guard<SpinLock> l(_tablets_channels_lock);
for (auto& it : _tablets_channels) {
std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem;
it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem);
it.second->get_active_memtable_mem_consumption(&tablets_channel_mem);
writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem));
}
}

View File

@ -360,12 +360,12 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
// to avoid blocking the.
continue;
}
std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>
writers_mem_snap;
kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap);
kv.second->get_active_memtable_mem_consumption(&writers_mem_snap);
for (auto item : writers_mem_snap) {
// multimap is empty
if (item.second.empty()) {
@ -412,8 +412,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
<< " delta writers (total mem: "
<< PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: "
<< PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
<< ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front())
<< ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
<< "), ";
<< ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << "), ";
if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
oss << "because total load mem consumption "
<< PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded";

View File

@ -282,6 +282,16 @@ int64_t TabletsChannel::mem_consumption() {
return write_mem_usage + flush_mem_usage;
}
void TabletsChannel::get_active_memtable_mem_consumption(
std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
mem_consumptions->clear();
std::lock_guard<SpinLock> l(_tablet_writers_lock);
for (auto& it : _tablet_writers) {
int64_t active_memtable_mem = it.second->active_memtable_mem_consumption();
mem_consumptions->emplace(active_memtable_mem, it.first);
}
}
// Old logic,used for opening all writers of all partitions.
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;

View File

@ -115,11 +115,8 @@ public:
int64_t mem_consumption();
void get_writers_mem_consumption_snapshot(
std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
std::lock_guard<SpinLock> l(_tablet_writers_lock);
*mem_consumptions = _mem_consumptions;
}
void get_active_memtable_mem_consumption(
std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions);
void flush_memtable_async(int64_t tablet_id);
void wait_flush(int64_t tablet_id);