diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index bbb9074514..e7ade20214 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -383,9 +383,14 @@ Status DeltaWriter::cancel() { return Status::OK(); } -int64_t DeltaWriter::save_memtable_consumption_snapshot() { +void DeltaWriter::save_mem_consumption_snapshot() { + _mem_consumption_snapshot = mem_consumption(); _memtable_consumption_snapshot = memtable_consumption(); - return _memtable_consumption_snapshot; +} + +int64_t DeltaWriter::get_memtable_consumption_inflush() const { + if (_flush_token->get_stats().flush_running_count == 0) return 0; + return _mem_consumption_snapshot - _memtable_consumption_snapshot; } int64_t DeltaWriter::get_memtable_consumption_snapshot() const { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 4ad7df38ed..a7770326fd 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -104,7 +104,9 @@ public: int64_t memtable_consumption() const; - int64_t save_memtable_consumption_snapshot(); + void save_mem_consumption_snapshot(); + + int64_t get_memtable_consumption_inflush() const; int64_t get_memtable_consumption_snapshot() const; @@ -161,6 +163,9 @@ private: // use in vectorized load bool _is_vec; + // memory consumption snapshot for current delta_writer, only + // used for std::sort + int64_t _mem_consumption_snapshot = 0; // memory consumption snapshot for current memtable, only // used for std::sort int64_t _memtable_consumption_snapshot = 0; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index da2b43ceaa..a75639e433 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -177,9 +177,6 @@ Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) { { // lock so that only one thread can check mem limit std::lock_guard l(_lock); - - LOG(INFO) << "reducing memory of " << *this - << " ,mem consumption: " << _mem_tracker->consumption(); found = _find_largest_consumption_channel(&channel); } // Release lock so that other threads can still call add_batch concurrently. diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index fa72d23ad0..d833b5d887 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -199,10 +199,12 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { if (_try_to_wait_flushing()) { // `_try_to_wait_flushing()` returns true means other thread already // reduced the mem usage, and current thread do not need to reduce again. + LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id + << ", index_id: " << _index_id; return Status::OK(); } - std::vector writers_to_flush; + std::vector writers_to_wait_flush; { std::lock_guard l(_lock); if (_state == kFinished) { @@ -214,9 +216,16 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { // Sort the DeltaWriters by mem consumption in descend order. std::vector writers; for (auto& it : _tablet_writers) { - it.second->save_memtable_consumption_snapshot(); + it.second->save_mem_consumption_snapshot(); writers.push_back(it.second); } + int64_t total_memtable_consumption_in_flush = 0; + for (auto writer : writers) { + if (writer->get_memtable_consumption_inflush() > 0) { + writers_to_wait_flush.push_back(writer); + total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush(); + } + } std::sort(writers.begin(), writers.end(), [](const DeltaWriter* lhs, const DeltaWriter* rhs) { return lhs->get_memtable_consumption_snapshot() > @@ -235,48 +244,62 @@ Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes. int64_t mem_to_flushed = _mem_tracker->consumption() / 3; - int counter = 0; - int64_t sum = 0; - for (auto writer : writers) { - if (writer->mem_consumption() <= 0) { - break; + if (total_memtable_consumption_in_flush < mem_to_flushed) { + mem_to_flushed -= total_memtable_consumption_in_flush; + int counter = 0; + int64_t sum = 0; + for (auto writer : writers) { + if (writer->mem_consumption() <= 0) { + break; + } + ++counter; + sum += writer->mem_consumption(); + if (sum > mem_to_flushed) { + break; + } } - ++counter; - sum += writer->mem_consumption(); - if (sum > mem_to_flushed) { - break; + std::ostringstream ss; + ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush + << " will flush " << counter << " more memtables to reduce memory: " << sum; + if (counter > 0) { + ss << ", the size of smallest memtable to flush is " + << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes"; } + LOG(INFO) << ss.str(); + google::protobuf::RepeatedPtrField* tablet_errors = + response->mutable_tablet_errors(); + // following loop flush memtable async, we'll do it with _lock + for (int i = 0; i < counter; i++) { + Status st = writers[i]->flush_memtable_and_wait(false); + if (!st.ok()) { + auto err_msg = strings::Substitute( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4", + writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(), + st.get_error_msg()); + LOG(WARNING) << err_msg; + PTabletError* error = tablet_errors->Add(); + error->set_tablet_id(writers[i]->tablet_id()); + error->set_msg(err_msg); + _broken_tablets.insert(writers[i]->tablet_id()); + } + } + for (int i = 0; i < counter; i++) { + if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) { + // skip broken tablets + continue; + } + writers_to_wait_flush.push_back(writers[i]); + } + _reducing_mem_usage = true; + } else { + LOG(INFO) << "total size of memtables in flush is big enough: " + << total_memtable_consumption_in_flush + << " bytes, will not flush more memtables"; } - VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " << sum; - google::protobuf::RepeatedPtrField* tablet_errors = - response->mutable_tablet_errors(); - // following loop flush memtable async, we'll do it with _lock - for (int i = 0; i < counter; i++) { - Status st = writers[i]->flush_memtable_and_wait(false); - if (!st.ok()) { - auto err_msg = strings::Substitute( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4", - writers[i]->tablet_id(), _txn_id, st.code(), st.precise_code(), - st.get_error_msg()); - LOG(WARNING) << err_msg; - PTabletError* error = tablet_errors->Add(); - error->set_tablet_id(writers[i]->tablet_id()); - error->set_msg(err_msg); - _broken_tablets.insert(writers[i]->tablet_id()); - } - } - for (int i = 0; i < counter; i++) { - if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) { - // skip broken tablets - continue; - } - writers_to_flush.push_back(writers[i]); - } - _reducing_mem_usage = true; } - for (auto writer : writers_to_flush) { + for (auto writer : writers_to_wait_flush) { Status st = writer->wait_flush(); if (!st.ok()) { return Status::InternalError(