From 9581d2b4eb22f5983fffc5debcb711233edeebc7 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 8 Aug 2023 22:02:42 +0800 Subject: [PATCH] [refactor](load) split memtable writer out of delta writer (#21892) --- be/src/olap/delta_writer.cpp | 265 +----------- be/src/olap/delta_writer.h | 48 +-- be/src/olap/memtable_writer.cpp | 377 ++++++++++++++++++ be/src/olap/memtable_writer.h | 163 ++++++++ be/test/olap/delta_writer_test.cpp | 52 ++- .../engine_storage_migration_task_test.cpp | 14 +- be/test/olap/memtable_memory_limiter_test.cpp | 13 +- be/test/olap/remote_rowset_gc_test.cpp | 13 +- be/test/olap/tablet_cooldown_test.cpp | 19 +- 9 files changed, 646 insertions(+), 318 deletions(-) create mode 100644 be/src/olap/memtable_writer.cpp create mode 100644 be/src/olap/memtable_writer.h diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index c3fed74c39..947999a076 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -36,9 +36,7 @@ #include "exec/tablet_info.h" #include "gutil/strings/numbers.h" #include "io/fs/file_writer.h" // IWYU pragma: keep -#include "olap/memtable.h" #include "olap/memtable_flush_executor.h" -#include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer.h" @@ -52,7 +50,6 @@ #include "olap/tablet_meta.h" #include "olap/txn_manager.h" #include "runtime/exec_env.h" -#include "runtime/memory/mem_tracker.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" #include "util/mem_info.h" @@ -76,6 +73,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, Runti _tablet(nullptr), _cur_rowset(nullptr), _rowset_writer(nullptr), + _memtable_writer(*req, profile), _tablet_schema(new TabletSchema), _delta_written_success(false), _storage_engine(storage_engine), @@ -85,20 +83,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, Runti void DeltaWriter::_init_profile(RuntimeProfile* profile) { _profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true); - _lock_timer = ADD_TIMER(_profile, "LockTime"); - _sort_timer = ADD_TIMER(_profile, "MemTableSortTime"); - _agg_timer = ADD_TIMER(_profile, "MemTableAggTime"); - _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime"); - _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime"); - _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime"); - _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime"); - _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime"); _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime"); - _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT); - _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT); - _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT); - _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT); - _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT); } DeltaWriter::~DeltaWriter() { @@ -110,15 +95,13 @@ DeltaWriter::~DeltaWriter() { return; } - if (_flush_token != nullptr) { - // cancel and wait all memtables in flush queue to be finished - _flush_token->cancel(); + // cancel and wait all memtables in flush queue to be finished + _memtable_writer.cancel(); - if (_tablet != nullptr) { - const FlushStatistic& stat = _flush_token->get_stats(); - _tablet->flush_bytes->increment(stat.flush_size_bytes); - _tablet->flush_finish_count->increment(stat.flush_finish_count); - } + if (_tablet != nullptr) { + const FlushStatistic& stat = _memtable_writer.get_flush_token_stats(); + _tablet->flush_bytes->increment(stat.flush_size_bytes); + _tablet->flush_finish_count->increment(stat.flush_finish_count); } if (_calc_delete_bitmap_token != nullptr) { @@ -129,8 +112,6 @@ DeltaWriter::~DeltaWriter() { _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string()); } - - _mem_table.reset(); } void DeltaWriter::_garbage_collection() { @@ -214,16 +195,11 @@ Status DeltaWriter::init() { context.write_type = DataWriteType::TYPE_DIRECT; context.mow_context = std::make_shared(_cur_max_version, _req.txn_id, _rowset_ids, _delete_bitmap); - RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer)); - - _reset_mem_table(); - - // create flush handler - // by assigning segment_id to memtable before submiting to flush executor, - // we can make sure same keys sort in the same order in all replicas. - bool should_serial = false; - RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token( - _flush_token, _rowset_writer.get(), should_serial, _req.is_high_priority)); + std::unique_ptr rowset_writer; + RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer)); + _rowset_writer = std::move(rowset_writer); + _memtable_writer.init(_rowset_writer, _tablet_schema, + _tablet->enable_unique_key_merge_on_write()); _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token(); _is_init = true; @@ -245,118 +221,15 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector if (!_is_init && !_is_cancelled) { RETURN_IF_ERROR(init()); } - - if (_is_cancelled) { - return _cancel_status; - } - - if (_is_closed) { - return Status::Error( - "write block after closed tablet_id={}, load_id={}-{}, txn_id={}", _req.tablet_id, - _req.load_id.hi(), _req.load_id.lo(), _req.txn_id); - } - - if (is_append) { - _total_received_rows += block->rows(); - } else { - _total_received_rows += row_idxs.size(); - } - _mem_table->insert(block, row_idxs, is_append); - - if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) { - _mem_table->shrink_memtable_by_agg(); - } - if (UNLIKELY(_mem_table->need_flush())) { - auto s = _flush_memtable_async(); - _reset_mem_table(); - if (UNLIKELY(!s.ok())) { - return s; - } - } - - return Status::OK(); -} - -Status DeltaWriter::_flush_memtable_async() { - return _flush_token->submit(std::move(_mem_table)); + return _memtable_writer.write(block, row_idxs, is_append); } Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { - std::lock_guard l(_lock); - if (!_is_init) { - // This writer is not initialized before flushing. Do nothing - // But we return OK instead of Status::Error(), - // Because this method maybe called when trying to reduce mem consumption, - // and at that time, the writer may not be initialized yet and that is a normal case. - return Status::OK(); - } - - if (_is_cancelled) { - return _cancel_status; - } - - VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " - << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id - << ", load id: " << print_id(_req.load_id); - auto s = _flush_memtable_async(); - _reset_mem_table(); - if (UNLIKELY(!s.ok())) { - return s; - } - - if (need_wait) { - // wait all memtables in flush queue to be flushed. - SCOPED_TIMER(_wait_flush_timer); - RETURN_IF_ERROR(_flush_token->wait()); - } - return Status::OK(); + return _memtable_writer.flush_memtable_and_wait(need_wait); } Status DeltaWriter::wait_flush() { - { - std::lock_guard l(_lock); - if (!_is_init) { - // return OK instead of Status::Error() for same reason - // as described in flush_memtable_and_wait() - return Status::OK(); - } - if (_is_cancelled) { - return _cancel_status; - } - } - SCOPED_TIMER(_wait_flush_timer); - RETURN_IF_ERROR(_flush_token->wait()); - return Status::OK(); -} - -void DeltaWriter::_reset_mem_table() { -#ifndef BE_TEST - auto mem_table_insert_tracker = std::make_shared( - fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); - auto mem_table_flush_tracker = std::make_shared( - fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()), - ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); -#else - auto mem_table_insert_tracker = std::make_shared( - fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, _load_id.to_string())); - auto mem_table_flush_tracker = std::make_shared( - fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string())); -#endif - { - std::lock_guard l(_mem_table_tracker_lock); - _mem_table_insert_trackers.push_back(mem_table_insert_tracker); - _mem_table_flush_trackers.push_back(mem_table_flush_tracker); - } - _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc, - _tablet->enable_unique_key_merge_on_write(), - mem_table_insert_tracker, mem_table_flush_tracker)); - - COUNTER_UPDATE(_segment_num, 1); + return _memtable_writer.wait_flush(); } Status DeltaWriter::close() { @@ -371,25 +244,7 @@ Status DeltaWriter::close() { // for this tablet when being closed. RETURN_IF_ERROR(init()); } - - if (_is_cancelled) { - return _cancel_status; - } - - if (_is_closed) { - LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id - << " load_id=" << _req.load_id << " txn_id=" << _req.txn_id; - return Status::OK(); - } - - auto s = _flush_memtable_async(); - _mem_table.reset(); - _is_closed = true; - if (UNLIKELY(!s.ok())) { - return s; - } else { - return Status::OK(); - } + return _memtable_writer.close(); } Status DeltaWriter::build_rowset() { @@ -397,31 +252,8 @@ Status DeltaWriter::build_rowset() { DCHECK(_is_init) << "delta writer is supposed be to initialized before build_rowset() being called"; - if (_is_cancelled) { - return _cancel_status; - } + RETURN_IF_ERROR(_memtable_writer.close_wait()); - Status st; - // return error if previous flush failed - { - SCOPED_TIMER(_wait_flush_timer); - st = _flush_token->wait(); - } - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "previous flush failed tablet " << _tablet->tablet_id(); - return st; - } - - _mem_table.reset(); - - if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows != - _total_received_rows) { - LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " - << _rowset_writer->num_rows() - << ", merged_rows: " << _flush_token->memtable_stat().merged_rows - << ", total received rows: " << _total_received_rows; - return Status::InternalError("rows number written by delta writer dosen't match"); - } // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); if (_cur_rowset == nullptr) { @@ -505,31 +337,11 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, _delta_written_success = true; - // const FlushStatistic& stat = _flush_token->get_stats(); - // print slow log if wait more than 1s - /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) { - LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() - << ", load id: " << print_id(_req.load_id) << ", wait close for " - << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat; - }*/ - if (write_single_replica) { for (auto node_info : slave_tablet_nodes.slave_nodes()) { _request_slave_tablet_pull_rowset(node_info); } } - COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000); - COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns()); - COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns()); - const auto& memtable_stat = _flush_token->memtable_stat(); - COUNTER_SET(_sort_timer, memtable_stat.sort_ns); - COUNTER_SET(_agg_timer, memtable_stat.agg_ns); - COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns); - COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns); - COUNTER_SET(_sort_times, memtable_stat.sort_times); - COUNTER_SET(_agg_times, memtable_stat.agg_times); - COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows); - COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows); return Status::OK(); } @@ -558,60 +370,23 @@ Status DeltaWriter::cancel_with_status(const Status& st) { if (_is_cancelled) { return Status::OK(); } + RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st)); if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) { _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */ } - _mem_table.reset(); - if (_flush_token != nullptr) { - // cancel and wait all memtables in flush queue to be finished - _flush_token->cancel(); - } if (_calc_delete_bitmap_token != nullptr) { _calc_delete_bitmap_token->cancel(); } _is_cancelled = true; - _cancel_status = st; return Status::OK(); } int64_t DeltaWriter::mem_consumption(MemType mem) { - if (_flush_token == nullptr) { - // This method may be called before this writer is initialized. - // So _flush_token may be null. - return 0; - } - int64_t mem_usage = 0; - { - std::lock_guard l(_mem_table_tracker_lock); - if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 - for (auto mem_table_tracker : _mem_table_insert_trackers) { - mem_usage += mem_table_tracker->consumption(); - } - } - if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 - for (auto mem_table_tracker : _mem_table_flush_trackers) { - mem_usage += mem_table_tracker->consumption(); - } - } - } - return mem_usage; + return _memtable_writer.mem_consumption(mem); } int64_t DeltaWriter::active_memtable_mem_consumption() { - if (_flush_token == nullptr) { - // This method may be called before this writer is initialized. - // So _flush_token may be null. - return 0; - } - int64_t mem_usage = 0; - { - std::lock_guard l(_mem_table_tracker_lock); - if (_mem_table_insert_trackers.size() > 0) { - mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption(); - mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption(); - } - } - return mem_usage; + return _memtable_writer.active_memtable_mem_consumption(); } int64_t DeltaWriter::partition_id() const { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index e45a8752e4..4c452eef8a 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -30,7 +30,7 @@ #include #include "common/status.h" -#include "olap/memtable.h" +#include "olap/memtable_writer.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" #include "olap/tablet.h" @@ -54,20 +54,12 @@ namespace vectorized { class Block; } // namespace vectorized -enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; - -struct WriteRequest { - int64_t tablet_id; +struct WriteRequest : MemTableWriter::WriteRequest { int32_t schema_hash; int64_t txn_id; int64_t partition_id; - PUniqueId load_id; - TupleDescriptor* tuple_desc; - // slots are in order of tablet's schema - const std::vector* slots; - bool is_high_priority = false; - OlapTableSchemaParam* table_schema_param; int64_t index_id = 0; + OlapTableSchemaParam* table_schema_param; }; // Writer for a particular (load, index, tablet). @@ -138,13 +130,8 @@ private: DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, const UniqueId& load_id); - // push a full memtable to flush executor - Status _flush_memtable_async(); - void _garbage_collection(); - void _reset_mem_table(); - void _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); @@ -155,28 +142,16 @@ private: bool _is_init = false; bool _is_cancelled = false; - bool _is_closed = false; - Status _cancel_status; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; - std::unique_ptr _rowset_writer; - // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr - std::unique_ptr _mem_table; - //const TabletSchema* _tablet_schema; - // tablet schema owned by delta writer, all write will use this tablet schema - // it's build from tablet_schema(stored when create tablet) and OlapTableSchema - // every request will have it's own tablet schema so simple schema change can work + std::shared_ptr _rowset_writer; + MemTableWriter _memtable_writer; TabletSchemaSPtr _tablet_schema; bool _delta_written_success; StorageEngine* _storage_engine; UniqueId _load_id; - std::unique_ptr _flush_token; - std::vector> _mem_table_insert_trackers; - std::vector> _mem_table_flush_trackers; - SpinLock _mem_table_tracker_lock; - std::atomic _mem_table_num = 1; std::mutex _lock; @@ -195,20 +170,7 @@ private: int64_t _total_received_rows = 0; RuntimeProfile* _profile = nullptr; - RuntimeProfile::Counter* _lock_timer = nullptr; - RuntimeProfile::Counter* _sort_timer = nullptr; - RuntimeProfile::Counter* _agg_timer = nullptr; - RuntimeProfile::Counter* _wait_flush_timer = nullptr; - RuntimeProfile::Counter* _delete_bitmap_timer = nullptr; - RuntimeProfile::Counter* _segment_writer_timer = nullptr; - RuntimeProfile::Counter* _memtable_duration_timer = nullptr; - RuntimeProfile::Counter* _put_into_output_timer = nullptr; - RuntimeProfile::Counter* _sort_times = nullptr; - RuntimeProfile::Counter* _agg_times = nullptr; RuntimeProfile::Counter* _close_wait_timer = nullptr; - RuntimeProfile::Counter* _segment_num = nullptr; - RuntimeProfile::Counter* _raw_rows_num = nullptr; - RuntimeProfile::Counter* _merged_rows_num = nullptr; MonotonicStopWatch _lock_watch; }; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp new file mode 100644 index 0000000000..8284001e47 --- /dev/null +++ b/be/src/olap/memtable_writer.cpp @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_writer.h" + +#include + +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gutil/strings/numbers.h" +#include "io/fs/file_writer.h" // IWYU pragma: keep +#include "olap/memtable.h" +#include "olap/memtable_flush_executor.h" +#include "olap/memtable_memory_limiter.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/schema_change.h" +#include "olap/storage_engine.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "service/backend_options.h" +#include "util/mem_info.h" +#include "util/stopwatch.hpp" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +MemTableWriter::MemTableWriter(const WriteRequest& req, RuntimeProfile* profile) : _req(req) { + _init_profile(profile); +} + +void MemTableWriter::_init_profile(RuntimeProfile* profile) { + _profile = profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true); + _lock_timer = ADD_TIMER(_profile, "LockTime"); + _sort_timer = ADD_TIMER(_profile, "MemTableSortTime"); + _agg_timer = ADD_TIMER(_profile, "MemTableAggTime"); + _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime"); + _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime"); + _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime"); + _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime"); + _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime"); + _close_wait_timer = ADD_TIMER(_profile, "MemTableWriterCloseWaitTime"); + _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT); + _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT); + _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT); + _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT); + _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT); +} + +MemTableWriter::~MemTableWriter() { + if (!_is_init) { + return; + } + if (_flush_token != nullptr) { + // cancel and wait all memtables in flush queue to be finished + _flush_token->cancel(); + } + _mem_table.reset(); +} + +Status MemTableWriter::init(std::shared_ptr rowset_writer, + TabletSchemaSPtr tablet_schema, bool unique_key_mow) { + _rowset_writer = rowset_writer; + _tablet_schema = tablet_schema; + _unique_key_mow = unique_key_mow; + + _reset_mem_table(); + + // create flush handler + // by assigning segment_id to memtable before submiting to flush executor, + // we can make sure same keys sort in the same order in all replicas. + bool should_serial = false; + RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer.get(), should_serial, _req.is_high_priority)); + + _is_init = true; + return Status::OK(); +} + +Status MemTableWriter::append(const vectorized::Block* block) { + return write(block, {}, true); +} + +Status MemTableWriter::write(const vectorized::Block* block, const std::vector& row_idxs, + bool is_append) { + if (UNLIKELY(row_idxs.empty() && !is_append)) { + return Status::OK(); + } + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + if (_is_cancelled) { + return _cancel_status; + } + if (!_is_init) { + return Status::Error("delta segment writer has not been initialized"); + } + if (_is_closed) { + return Status::Error("write block after closed tablet_id={}, load_id={}-{}", + _req.tablet_id, _req.load_id.hi(), _req.load_id.lo()); + } + + if (is_append) { + _total_received_rows += block->rows(); + } else { + _total_received_rows += row_idxs.size(); + } + _mem_table->insert(block, row_idxs, is_append); + + if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) { + _mem_table->shrink_memtable_by_agg(); + } + if (UNLIKELY(_mem_table->need_flush())) { + auto s = _flush_memtable_async(); + _reset_mem_table(); + if (UNLIKELY(!s.ok())) { + return s; + } + } + + return Status::OK(); +} + +Status MemTableWriter::_flush_memtable_async() { + DCHECK(_flush_token != nullptr); + return _flush_token->submit(std::move(_mem_table)); +} + +Status MemTableWriter::flush_memtable_and_wait(bool need_wait) { + std::lock_guard l(_lock); + if (!_is_init) { + // This writer is not initialized before flushing. Do nothing + // But we return OK instead of Status::Error(), + // Because this method maybe called when trying to reduce mem consumption, + // and at that time, the writer may not be initialized yet and that is a normal case. + return Status::OK(); + } + + if (_is_cancelled) { + return _cancel_status; + } + + VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " + << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id + << ", load id: " << print_id(_req.load_id); + auto s = _flush_memtable_async(); + _reset_mem_table(); + if (UNLIKELY(!s.ok())) { + return s; + } + + if (need_wait) { + // wait all memtables in flush queue to be flushed. + SCOPED_TIMER(_wait_flush_timer); + RETURN_IF_ERROR(_flush_token->wait()); + } + return Status::OK(); +} + +Status MemTableWriter::wait_flush() { + { + std::lock_guard l(_lock); + if (!_is_init) { + // return OK instead of Status::Error() for same reason + // as described in flush_memtable_and_wait() + return Status::OK(); + } + if (_is_cancelled) { + return _cancel_status; + } + } + SCOPED_TIMER(_wait_flush_timer); + RETURN_IF_ERROR(_flush_token->wait()); + return Status::OK(); +} + +void MemTableWriter::_reset_mem_table() { +#ifndef BE_TEST + auto mem_table_insert_tracker = std::make_shared( + fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", + std::to_string(tablet_id()), _mem_table_num, + UniqueId(_req.load_id).to_string()), + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); + auto mem_table_flush_tracker = std::make_shared( + fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", + std::to_string(tablet_id()), _mem_table_num++, + UniqueId(_req.load_id).to_string()), + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); +#else + auto mem_table_insert_tracker = std::make_shared(fmt::format( + "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", + std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string())); + auto mem_table_flush_tracker = std::make_shared(fmt::format( + "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), + _mem_table_num++, UniqueId(_req.load_id).to_string())); +#endif + { + std::lock_guard l(_mem_table_tracker_lock); + _mem_table_insert_trackers.push_back(mem_table_insert_tracker); + _mem_table_flush_trackers.push_back(mem_table_flush_tracker); + } + _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc, + _unique_key_mow, mem_table_insert_tracker, + mem_table_flush_tracker)); + + COUNTER_UPDATE(_segment_num, 1); +} + +Status MemTableWriter::close() { + _lock_watch.start(); + std::lock_guard l(_lock); + _lock_watch.stop(); + if (_is_cancelled) { + return _cancel_status; + } + if (!_is_init) { + return Status::Error("delta segment writer has not been initialized"); + } + if (_is_closed) { + LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id + << " load_id=" << _req.load_id; + return Status::OK(); + } + + auto s = _flush_memtable_async(); + _mem_table.reset(); + _is_closed = true; + if (UNLIKELY(!s.ok())) { + return s; + } else { + return Status::OK(); + } +} + +Status MemTableWriter::close_wait() { + SCOPED_TIMER(_close_wait_timer); + std::lock_guard l(_lock); + DCHECK(_is_init) + << "delta writer is supposed be to initialized before close_wait() being called"; + + if (_is_cancelled) { + return _cancel_status; + } + + Status st; + // return error if previous flush failed + { + SCOPED_TIMER(_wait_flush_timer); + st = _flush_token->wait(); + } + if (UNLIKELY(!st.ok())) { + LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id; + return st; + } + + _mem_table.reset(); + + if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows != + _total_received_rows) { + LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " + << _rowset_writer->num_rows() + << ", merged_rows: " << _flush_token->memtable_stat().merged_rows + << ", total received rows: " << _total_received_rows; + return Status::InternalError("rows number written by delta writer dosen't match"); + } + + // const FlushStatistic& stat = _flush_token->get_stats(); + // print slow log if wait more than 1s + /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) { + LOG(INFO) << "close delta writer for tablet: " << req.tablet_id + << ", load id: " << print_id(_req.load_id) << ", wait close for " + << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat; + }*/ + + COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000); + COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns()); + COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns()); + const auto& memtable_stat = _flush_token->memtable_stat(); + COUNTER_SET(_sort_timer, memtable_stat.sort_ns); + COUNTER_SET(_agg_timer, memtable_stat.agg_ns); + COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns); + COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns); + COUNTER_SET(_sort_times, memtable_stat.sort_times); + COUNTER_SET(_agg_times, memtable_stat.agg_times); + COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows); + COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows); + return Status::OK(); +} + +Status MemTableWriter::cancel() { + return cancel_with_status(Status::Cancelled("already cancelled")); +} + +Status MemTableWriter::cancel_with_status(const Status& st) { + std::lock_guard l(_lock); + if (_is_cancelled) { + return Status::OK(); + } + _mem_table.reset(); + if (_flush_token != nullptr) { + // cancel and wait all memtables in flush queue to be finished + _flush_token->cancel(); + } + _is_cancelled = true; + _cancel_status = st; + return Status::OK(); +} + +const FlushStatistic& MemTableWriter::get_flush_token_stats() { + return _flush_token->get_stats(); +} + +int64_t MemTableWriter::mem_consumption(MemType mem) { + if (_flush_token == nullptr) { + // This method may be called before this writer is initialized. + // So _flush_token may be null. + return 0; + } + int64_t mem_usage = 0; + { + std::lock_guard l(_mem_table_tracker_lock); + if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 + for (auto mem_table_tracker : _mem_table_insert_trackers) { + mem_usage += mem_table_tracker->consumption(); + } + } + if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 + for (auto mem_table_tracker : _mem_table_flush_trackers) { + mem_usage += mem_table_tracker->consumption(); + } + } + } + return mem_usage; +} + +int64_t MemTableWriter::active_memtable_mem_consumption() { + if (_flush_token == nullptr) { + // This method may be called before this writer is initialized. + // So _flush_token may be null. + return 0; + } + int64_t mem_usage = 0; + { + std::lock_guard l(_mem_table_tracker_lock); + if (_mem_table_insert_trackers.size() > 0) { + mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption(); + mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption(); + } + } + return mem_usage; +} + +} // namespace doris diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h new file mode 100644 index 0000000000..92600e450e --- /dev/null +++ b/be/src/olap/memtable_writer.h @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "olap/memtable.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "util/spinlock.h" +#include "util/uid_util.h" + +namespace doris { + +class FlushToken; +class MemTable; +class MemTracker; +class StorageEngine; +class TupleDescriptor; +class SlotDescriptor; +class OlapTableSchemaParam; +class RowsetWriter; +struct FlushStatistic; + +namespace vectorized { +class Block; +} // namespace vectorized + +enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; + +// Writer for a particular (load, index, tablet). +// This class is NOT thread-safe, external synchronization is required. +class MemTableWriter { +public: + struct WriteRequest { + int64_t tablet_id; + PUniqueId load_id; + TupleDescriptor* tuple_desc; + // slots are in order of tablet's schema + const std::vector* slots; + bool is_high_priority = false; + }; + + MemTableWriter(const WriteRequest& req, RuntimeProfile* profile); + + ~MemTableWriter(); + + Status init(std::shared_ptr rowset_writer, TabletSchemaSPtr tablet_schema, + bool unique_key_mow = false); + + Status write(const vectorized::Block* block, const std::vector& row_idxs, + bool is_append = false); + + Status append(const vectorized::Block* block); + + // flush the last memtable to flush queue, must call it before close_wait() + Status close(); + // wait for all memtables to be flushed. + // mem_consumption() should be 0 after this function returns. + Status close_wait(); + + // abandon current memtable and wait for all pending-flushing memtables to be destructed. + // mem_consumption() should be 0 after this function returns. + Status cancel(); + Status cancel_with_status(const Status& st); + + // submit current memtable to flush queue, and wait all memtables in flush queue + // to be flushed. + // This is currently for reducing mem consumption of this delta writer. + // If need_wait is true, it will wait for all memtable in flush queue to be flushed. + // Otherwise, it will just put memtables to the flush queue and return. + Status flush_memtable_and_wait(bool need_wait); + + int64_t mem_consumption(MemType mem); + int64_t active_memtable_mem_consumption(); + + // Wait all memtable in flush queue to be flushed + Status wait_flush(); + + int64_t tablet_id() { return _req.tablet_id; } + + int64_t total_received_rows() const { return _total_received_rows; } + + const FlushStatistic& get_flush_token_stats(); + +private: + // push a full memtable to flush executor + Status _flush_memtable_async(); + + void _reset_mem_table(); + + void _init_profile(RuntimeProfile* profile); + + bool _is_init = false; + bool _is_cancelled = false; + bool _is_closed = false; + Status _cancel_status; + WriteRequest _req; + std::shared_ptr _rowset_writer; + std::unique_ptr _mem_table; + TabletSchemaSPtr _tablet_schema; + bool _unique_key_mow = false; + + std::unique_ptr _flush_token; + std::vector> _mem_table_insert_trackers; + std::vector> _mem_table_flush_trackers; + SpinLock _mem_table_tracker_lock; + std::atomic _mem_table_num = 1; + + std::mutex _lock; + + // total rows num written by MemTableWriter + int64_t _total_received_rows = 0; + + RuntimeProfile* _profile = nullptr; + RuntimeProfile::Counter* _lock_timer = nullptr; + RuntimeProfile::Counter* _sort_timer = nullptr; + RuntimeProfile::Counter* _agg_timer = nullptr; + RuntimeProfile::Counter* _wait_flush_timer = nullptr; + RuntimeProfile::Counter* _delete_bitmap_timer = nullptr; + RuntimeProfile::Counter* _segment_writer_timer = nullptr; + RuntimeProfile::Counter* _memtable_duration_timer = nullptr; + RuntimeProfile::Counter* _put_into_output_timer = nullptr; + RuntimeProfile::Counter* _sort_times = nullptr; + RuntimeProfile::Counter* _agg_times = nullptr; + RuntimeProfile::Counter* _close_wait_timer = nullptr; + RuntimeProfile::Counter* _segment_num = nullptr; + RuntimeProfile::Counter* _raw_rows_num = nullptr; + RuntimeProfile::Counter* _merged_rows_num = nullptr; + + MonotonicStopWatch _lock_watch; +}; + +} // namespace doris diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 407d40d6e9..1ca33b1de7 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -486,9 +486,16 @@ TEST_F(TestDeltaWriter, open) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10003, 270068375, 20001, 30001, load_id, tuple_desc, &(tuple_desc->slots()), - true, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10003; + write_req.schema_hash = 270068375; + write_req.txn_id = 20001; + write_req.partition_id = 30001; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = true; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer = nullptr; // test vec delta writer @@ -525,9 +532,16 @@ TEST_F(TestDeltaWriter, vec_write) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10004, 270068376, 20002, 30002, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10004; + write_req.schema_hash = 270068376; + write_req.txn_id = 20002; + write_req.partition_id = 30002; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer = nullptr; std::unique_ptr profile; profile = std::make_unique("LoadChannels"); @@ -682,9 +696,16 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10005, 270068377, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10005; + write_req.schema_hash = 270068377; + write_req.txn_id = 20003; + write_req.partition_id = 30003; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer = nullptr; std::unique_ptr profile; profile = std::make_unique("LoadChannels"); @@ -792,9 +813,16 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10005, 270068377, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10005; + write_req.schema_hash = 270068377; + write_req.txn_id = 20003; + write_req.partition_id = 30003; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer1 = nullptr; DeltaWriter* delta_writer2 = nullptr; std::unique_ptr profile1; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 2226f03653..39cb847548 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -182,9 +182,17 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10005, 270068377, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10005; + write_req.schema_hash = 270068377; + write_req.txn_id = 20003; + write_req.partition_id = 30003; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; + DeltaWriter* delta_writer = nullptr; std::unique_ptr profile; diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp index 7b49b22b32..4dce31da0b 100644 --- a/be/test/olap/memtable_memory_limiter_test.cpp +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -127,9 +127,16 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10000, 270068372, 20002, 30002, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10000; + write_req.schema_hash = 270068372; + write_req.txn_id = 20002; + write_req.partition_id = 30002; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer = nullptr; std::unique_ptr profile; profile = std::make_unique("MemTableMemoryLimiterTest"); diff --git a/be/test/olap/remote_rowset_gc_test.cpp b/be/test/olap/remote_rowset_gc_test.cpp index cf10c89722..675f0ba578 100644 --- a/be/test/olap/remote_rowset_gc_test.cpp +++ b/be/test/olap/remote_rowset_gc_test.cpp @@ -188,9 +188,16 @@ TEST_F(RemoteRowsetGcTest, normal) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = { - 10005, 270068377, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots()), - false, ¶m}; + WriteRequest write_req; + write_req.tablet_id = 10005; + write_req.schema_hash = 270068377; + write_req.txn_id = 20003; + write_req.partition_id = 30003; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; std::unique_ptr profile; profile = std::make_unique("LoadChannels"); DeltaWriter* delta_writer = nullptr; diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 6cf0582c10..847c115a3b 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -356,15 +356,16 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* tablet, int64_t replic load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = {tablet_id, - schema_hash, - txn_id, - partition_id, - load_id, - tuple_desc, - &(tuple_desc->slots()), - false, - ¶m}; + WriteRequest write_req; + write_req.tablet_id = tablet_id; + write_req.schema_hash = schema_hash; + write_req.txn_id = txn_id; + write_req.partition_id = partition_id; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = ¶m; DeltaWriter* delta_writer = nullptr; std::unique_ptr profile;