[refactor] separate agg and flush in memtable (#15713)
This commit is contained in:
@ -490,13 +490,12 @@ CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
|
||||
// Alignment
|
||||
CONF_Int32(memory_max_alignment, "16");
|
||||
|
||||
// write buffer size before flush
|
||||
// max write buffer size before flush, default 200MB
|
||||
CONF_mInt64(write_buffer_size, "209715200");
|
||||
|
||||
// max buffer size used in memtable for the aggregated table
|
||||
CONF_mInt64(memtable_max_buffer_size, "419430400");
|
||||
// max buffer size used in memtable for the aggregated table, default 400MB
|
||||
CONF_mInt64(write_buffer_size_for_agg, "419430400");
|
||||
// write buffer size in push task for sparkload, default 1GB
|
||||
CONF_mInt64(flush_size_for_sparkload, "1073741824");
|
||||
CONF_mInt64(write_buffer_size_for_sparkload, "1073741824");
|
||||
|
||||
// following 2 configs limit the memory consumption of load process on a Backend.
|
||||
// eg: memory limit to 80% of mem limit config but up to 100GB(default)
|
||||
|
||||
@ -169,14 +169,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
|
||||
_total_received_rows += row_idxs.size();
|
||||
_mem_table->insert(block, row_idxs);
|
||||
|
||||
if (_mem_table->need_to_agg()) {
|
||||
if (UNLIKELY(_mem_table->need_agg())) {
|
||||
_mem_table->shrink_memtable_by_agg();
|
||||
if (_mem_table->is_flush()) {
|
||||
auto s = _flush_memtable_async();
|
||||
_reset_mem_table();
|
||||
if (UNLIKELY(!s.ok())) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
if (UNLIKELY(_mem_table->need_flush())) {
|
||||
auto s = _flush_memtable_async();
|
||||
_reset_mem_table();
|
||||
if (UNLIKELY(!s.ok())) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -296,13 +296,16 @@ void MemTable::shrink_memtable_by_agg() {
|
||||
_collect_vskiplist_results<false>();
|
||||
}
|
||||
|
||||
bool MemTable::is_flush() const {
|
||||
bool MemTable::need_flush() const {
|
||||
return memory_usage() >= config::write_buffer_size;
|
||||
}
|
||||
|
||||
bool MemTable::need_to_agg() {
|
||||
return _keys_type == KeysType::DUP_KEYS ? is_flush()
|
||||
: memory_usage() >= config::memtable_max_buffer_size;
|
||||
bool MemTable::need_agg() const {
|
||||
if (_keys_type == KeysType::AGG_KEYS) {
|
||||
return memory_usage() >= config::write_buffer_size_for_agg;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
|
||||
|
||||
@ -58,9 +58,9 @@ public:
|
||||
|
||||
void shrink_memtable_by_agg();
|
||||
|
||||
bool is_flush() const;
|
||||
bool need_flush() const;
|
||||
|
||||
bool need_to_agg();
|
||||
bool need_agg() const;
|
||||
|
||||
/// Flush
|
||||
Status flush();
|
||||
|
||||
@ -250,7 +250,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
|
||||
VLOG_NOTICE << "start to convert etl file to delta.";
|
||||
while (!reader->eof()) {
|
||||
if (reader->mem_pool()->mem_tracker()->consumption() >
|
||||
config::flush_size_for_sparkload) {
|
||||
config::write_buffer_size_for_sparkload) {
|
||||
RETURN_NOT_OK(rowset_writer->flush());
|
||||
reader->mem_pool()->free_all();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user