[Enhancement](load) consider memtable in flush while reducing load me… (#13480)

We should consider memory which are being flushed from memtable to disk when trying to reduce memory by flushing memtable. Otherwise, we might not release memory space as expected. (e.g. lots of large memtable is in flush, the reduce_mem_usage method picks some small memtables to flush, it can't release enough memory and also can generate lots of small segments, which can cause -238 error)
This commit is contained in:
zhannngchen
2022-10-21 08:35:35 +08:00
committed by GitHub
parent e62d3dd8e5
commit 1b0dafcaa1
4 changed files with 75 additions and 45 deletions

View File

@ -383,9 +383,14 @@ Status DeltaWriter::cancel() {
return Status::OK();
}
int64_t DeltaWriter::save_memtable_consumption_snapshot() {
void DeltaWriter::save_mem_consumption_snapshot() {
_mem_consumption_snapshot = mem_consumption();
_memtable_consumption_snapshot = memtable_consumption();
return _memtable_consumption_snapshot;
}
int64_t DeltaWriter::get_memtable_consumption_inflush() const {
if (_flush_token->get_stats().flush_running_count == 0) return 0;
return _mem_consumption_snapshot - _memtable_consumption_snapshot;
}
int64_t DeltaWriter::get_memtable_consumption_snapshot() const {

View File

@ -104,7 +104,9 @@ public:
int64_t memtable_consumption() const;
int64_t save_memtable_consumption_snapshot();
void save_mem_consumption_snapshot();
int64_t get_memtable_consumption_inflush() const;
int64_t get_memtable_consumption_snapshot() const;
@ -161,6 +163,9 @@ private:
// use in vectorized load
bool _is_vec;
// memory consumption snapshot for current delta_writer, only
// used for std::sort
int64_t _mem_consumption_snapshot = 0;
// memory consumption snapshot for current memtable, only
// used for std::sort
int64_t _memtable_consumption_snapshot = 0;

View File

@ -177,9 +177,6 @@ Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
{
// lock so that only one thread can check mem limit
std::lock_guard<std::mutex> l(_lock);
LOG(INFO) << "reducing memory of " << *this
<< " ,mem consumption: " << _mem_tracker->consumption();
found = _find_largest_consumption_channel(&channel);
}
// Release lock so that other threads can still call add_batch concurrently.

View File

@ -199,10 +199,12 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
if (_try_to_wait_flushing()) {
// `_try_to_wait_flushing()` returns true means other thread already
// reduced the mem usage, and current thread do not need to reduce again.
LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id
<< ", index_id: " << _index_id;
return Status::OK();
}
std::vector<DeltaWriter*> writers_to_flush;
std::vector<DeltaWriter*> writers_to_wait_flush;
{
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
@ -214,9 +216,16 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
// Sort the DeltaWriters by mem consumption in descend order.
std::vector<DeltaWriter*> writers;
for (auto& it : _tablet_writers) {
it.second->save_memtable_consumption_snapshot();
it.second->save_mem_consumption_snapshot();
writers.push_back(it.second);
}
int64_t total_memtable_consumption_in_flush = 0;
for (auto writer : writers) {
if (writer->get_memtable_consumption_inflush() > 0) {
writers_to_wait_flush.push_back(writer);
total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush();
}
}
std::sort(writers.begin(), writers.end(),
[](const DeltaWriter* lhs, const DeltaWriter* rhs) {
return lhs->get_memtable_consumption_snapshot() >
@ -235,48 +244,62 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
// the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
int64_t mem_to_flushed = _mem_tracker->consumption() / 3;
int counter = 0;
int64_t sum = 0;
for (auto writer : writers) {
if (writer->mem_consumption() <= 0) {
break;
if (total_memtable_consumption_in_flush < mem_to_flushed) {
mem_to_flushed -= total_memtable_consumption_in_flush;
int counter = 0;
int64_t sum = 0;
for (auto writer : writers) {
if (writer->mem_consumption() <= 0) {
break;
}
++counter;
sum += writer->mem_consumption();
if (sum > mem_to_flushed) {
break;
}
}
++counter;
sum += writer->mem_consumption();
if (sum > mem_to_flushed) {
break;
std::ostringstream ss;
ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush
<< " will flush " << counter << " more memtables to reduce memory: " << sum;
if (counter > 0) {
ss << ", the size of smallest memtable to flush is "
<< writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes";
}
LOG(INFO) << ss.str();
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
// following loop flush memtable async, we'll do it with _lock
for (int i = 0; i < counter; i++) {
Status st = writers[i]->flush_memtable_and_wait(false);
if (!st.ok()) {
auto err_msg = strings::Substitute(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(),
st.get_error_msg());
LOG(WARNING) << err_msg;
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(writers[i]->tablet_id());
error->set_msg(err_msg);
_broken_tablets.insert(writers[i]->tablet_id());
}
}
for (int i = 0; i < counter; i++) {
if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
// skip broken tablets
continue;
}
writers_to_wait_flush.push_back(writers[i]);
}
_reducing_mem_usage = true;
} else {
LOG(INFO) << "total size of memtables in flush is big enough: "
<< total_memtable_consumption_in_flush
<< " bytes, will not flush more memtables";
}
VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " << sum;
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
// following loop flush memtable async, we'll do it with _lock
for (int i = 0; i < counter; i++) {
Status st = writers[i]->flush_memtable_and_wait(false);
if (!st.ok()) {
auto err_msg = strings::Substitute(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(),
st.get_error_msg());
LOG(WARNING) << err_msg;
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(writers[i]->tablet_id());
error->set_msg(err_msg);
_broken_tablets.insert(writers[i]->tablet_id());
}
}
for (int i = 0; i < counter; i++) {
if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) {
// skip broken tablets
continue;
}
writers_to_flush.push_back(writers[i]);
}
_reducing_mem_usage = true;
}
for (auto writer : writers_to_flush) {
for (auto writer : writers_to_wait_flush) {
Status st = writer->wait_flush();
if (!st.ok()) {
return Status::InternalError(