From e6e86321671f4534cc5aecc774e1ffb39eb06a53 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Wed, 13 Dec 2023 16:59:25 +0800 Subject: [PATCH] [improvement](merge-on-write) Optimize publish when there are missing versions (#28012) 1. Do not retry publishing on be When there are too many missing versions, just add to async publish task. 2. To reduce memory consumption, clean up the tasks when there are too many async publish tasks. --- be/src/agent/task_worker_pool.cpp | 5 + be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/olap/olap_server.cpp | 130 +++++++++++------- be/src/olap/storage_engine.h | 5 +- .../olap/task/engine_publish_version_task.cpp | 13 +- be/test/olap/storage_engine_test.cpp | 104 +++++++++++++- 7 files changed, 200 insertions(+), 63 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9f109560b..2e35faac45 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1458,6 +1458,11 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& } if (status.is()) { + // there are too many missing versions, it has been be added to async + // publish task, so no need to retry here. + if (discontinuous_version_tablets.empty()) { + break; + } LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " << "transaction_id: " << publish_version_req.transaction_id; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7dbafcc62b..1be582fcc7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1084,6 +1084,9 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9"); DEFINE_mBool(enable_merge_on_write_correctness_check, "true"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); +// When the number of missing versions is more than this value, do not directly +// retry the publish and handle it through async publish. +DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); // The secure path with user files, used in the `local` table function. DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f4f2845098..069049fe70 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1149,6 +1149,9 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); DECLARE_mBool(enable_merge_on_write_correctness_check); // rowid conversion correctness check when compaction for mow table DECLARE_mBool(enable_rowid_conversion_correctness_check); +// When the number of missing versions is more than this value, do not directly +// retry the publish and handle it through async publish. +DECLARE_mInt32(mow_publish_max_discontinuous_version_num); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 881cc56a70..b4bd310592 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -225,7 +226,7 @@ Status StorageEngine::start_bg_threads() { .build(&_tablet_publish_txn_thread_pool)); RETURN_IF_ERROR(Thread::create( - "StorageEngine", "aync_publish_version_thread", + "StorageEngine", "async_publish_version_thread", [this]() { this->_async_publish_callback(); }, &_async_publish_thread)); LOG(INFO) << "async publish thread started"; @@ -1189,6 +1190,20 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ int64_t publish_version, int64_t transaction_id, bool is_recovery) { if (!is_recovery) { + bool exists = false; + { + std::shared_lock rlock(_async_publish_lock); + if (auto tablet_iter = _async_publish_tasks.find(tablet_id); + tablet_iter != _async_publish_tasks.end()) { + if (auto iter = tablet_iter->second.find(publish_version); + iter != tablet_iter->second.end()) { + exists = true; + } + } + } + if (exists) { + return; + } TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: " @@ -1205,12 +1220,12 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id << " version: " << publish_version << " txn_id:" << transaction_id << " is_recovery: " << is_recovery; - std::lock_guard lock(_async_publish_mutex); + std::unique_lock wlock(_async_publish_lock); _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id}; } int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { - std::lock_guard lock(_async_publish_mutex); + std::shared_lock rlock(_async_publish_lock); auto iter = _async_publish_tasks.find(tablet_id); if (iter == _async_publish_tasks.end()) { return INT64_MAX; @@ -1221,58 +1236,67 @@ int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { return iter->second.begin()->first; } +void StorageEngine::_process_async_publish() { + // tablet, publish_version + std::vector> need_removed_tasks; + { + std::unique_lock wlock(_async_publish_lock); + for (auto tablet_iter = _async_publish_tasks.begin(); + tablet_iter != _async_publish_tasks.end();) { + if (tablet_iter->second.empty()) { + tablet_iter = _async_publish_tasks.erase(tablet_iter); + continue; + } + int64_t tablet_id = tablet_iter->first; + TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); + if (!tablet) { + LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " + << tablet_id; + tablet_iter = _async_publish_tasks.erase(tablet_iter); + continue; + } + + auto task_iter = tablet_iter->second.begin(); + int64_t version = task_iter->first; + int64_t transaction_id = task_iter->second.first; + int64_t partition_id = task_iter->second.second; + int64_t max_version = tablet->max_version().second; + + if (version <= max_version) { + need_removed_tasks.emplace_back(tablet, version); + tablet_iter->second.erase(task_iter); + tablet_iter++; + continue; + } + if (version != max_version + 1) { + // Keep only the most recent versions + while (tablet_iter->second.size() > config::max_tablet_version_num) { + need_removed_tasks.emplace_back(tablet, version); + task_iter = tablet_iter->second.erase(task_iter); + version = task_iter->first; + } + tablet_iter++; + continue; + } + + auto async_publish_task = std::make_shared( + tablet, partition_id, transaction_id, version); + static_cast(_tablet_publish_txn_thread_pool->submit_func( + [=]() { async_publish_task->handle(); })); + tablet_iter->second.erase(task_iter); + need_removed_tasks.emplace_back(tablet, version); + tablet_iter++; + } + } + for (auto& [tablet, publish_version] : need_removed_tasks) { + static_cast(TabletMetaManager::remove_pending_publish_info( + tablet->data_dir(), tablet->tablet_id(), publish_version)); + } +} + void StorageEngine::_async_publish_callback() { while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) { - // tablet, publish_version - std::vector> need_removed_tasks; - { - std::lock_guard lock(_async_publish_mutex); - for (auto tablet_iter = _async_publish_tasks.begin(); - tablet_iter != _async_publish_tasks.end();) { - if (tablet_iter->second.empty()) { - tablet_iter = _async_publish_tasks.erase(tablet_iter); - continue; - } - int64_t tablet_id = tablet_iter->first; - TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); - if (!tablet) { - LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " - << tablet_id; - tablet_iter = _async_publish_tasks.erase(tablet_iter); - continue; - } - - auto task_iter = tablet_iter->second.begin(); - int64_t version = task_iter->first; - int64_t transaction_id = task_iter->second.first; - int64_t partition_id = task_iter->second.second; - int64_t max_version = tablet->max_version().second; - - if (version <= max_version) { - need_removed_tasks.emplace_back(tablet, version); - tablet_iter->second.erase(task_iter); - tablet_iter++; - continue; - } - if (version != max_version + 1) { - tablet_iter++; - continue; - } - - auto async_publish_task = std::make_shared( - tablet, partition_id, transaction_id, version); - static_cast( - StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( - [=]() { async_publish_task->handle(); })); - tablet_iter->second.erase(task_iter); - need_removed_tasks.emplace_back(tablet, version); - tablet_iter++; - } - } - for (auto& [tablet, publish_version] : need_removed_tasks) { - static_cast(TabletMetaManager::remove_pending_publish_info( - tablet->data_dir(), tablet->tablet_id(), publish_version)); - } + _process_async_publish(); } } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 679bcc68ce..4cb803a10a 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -333,6 +334,8 @@ private: void _async_publish_callback(); + void _process_async_publish(); + Status _persist_broken_paths(); private: @@ -487,7 +490,7 @@ private: std::map>> _async_publish_tasks; // aync publish for discontinuous versions of merge_on_write table scoped_refptr _async_publish_thread; - std::mutex _async_publish_mutex; + std::shared_mutex _async_publish_lock; bool _clear_segment_cache = false; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 9c76a2a3e9..5e3254f128 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -191,8 +191,17 @@ Status EnginePublishVersionTask::execute() { } auto handle_version_not_continuous = [&]() { add_error_tablet_id(tablet_info.tablet_id); - _discontinuous_version_tablets->emplace_back( - partition_id, tablet_info.tablet_id, version.first); + // When there are too many missing versions, do not directly retry the + // publish and handle it through async publish. + if (max_version + config::mow_publish_max_discontinuous_version_num < + version.first) { + StorageEngine::instance()->add_async_publish_task( + partition_id, tablet_info.tablet_id, version.first, + _publish_version_req.transaction_id, false); + } else { + _discontinuous_version_tablets->emplace_back( + partition_id, tablet_info.tablet_id, version.first); + } res = Status::Error( "check_version_exist failed"); int64_t missed_version = max_version + 1; diff --git a/be/test/olap/storage_engine_test.cpp b/be/test/olap/storage_engine_test.cpp index ebf572ef5f..eb1fdc4e8c 100644 --- a/be/test/olap/storage_engine_test.cpp +++ b/be/test/olap/storage_engine_test.cpp @@ -21,17 +21,16 @@ #include #include #include +#include #include #include "common/status.h" #include "gtest/gtest_pred_impl.h" -#include "testutil/test_util.h" - -using ::testing::_; -using ::testing::Return; -using ::testing::SetArgPointee; -using std::string; +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/tablet_manager.h" +#include "util/threadpool.h" namespace doris { using namespace config; @@ -39,14 +38,29 @@ using namespace config; class StorageEngineTest : public testing::Test { public: virtual void SetUp() { + _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp"; + EXPECT_TRUE( + io::global_local_filesystem()->delete_and_create_directory(_engine_data_path).ok()); + EXPECT_TRUE( + io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok()); + _data_dir.reset(new DataDir(_engine_data_path, 100000000)); + static_cast(_data_dir->init()); + EngineOptions options; + options.backend_uid = UniqueId::gen_uid(); _storage_engine.reset(new StorageEngine(options)); + ExecEnv::GetInstance()->set_storage_engine(_storage_engine.get()); } - virtual void TearDown() {} + virtual void TearDown() { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } std::unique_ptr _storage_engine; + std::string _engine_data_path; + std::unique_ptr _data_dir; }; TEST_F(StorageEngineTest, TestBrokenDisk) { @@ -86,4 +100,80 @@ TEST_F(StorageEngineTest, TestBrokenDisk) { } } +TEST_F(StorageEngineTest, TestAsyncPublish) { + auto st = ThreadPoolBuilder("TabletPublishTxnThreadPool") + .set_min_threads(config::tablet_publish_txn_max_thread) + .set_max_threads(config::tablet_publish_txn_max_thread) + .build(&_storage_engine->tablet_publish_txn_thread_pool()); + EXPECT_EQ(st, Status::OK()); + + int64_t partition_id = 1; + int64_t tablet_id = 111; + + TColumnType col_type; + col_type.__set_type(TPrimitiveType::SMALLINT); + TColumn col1; + col1.__set_column_name("col1"); + col1.__set_column_type(col_type); + col1.__set_is_key(true); + std::vector cols; + cols.push_back(col1); + TTabletSchema tablet_schema; + tablet_schema.__set_short_key_column_count(1); + tablet_schema.__set_schema_hash(3333); + tablet_schema.__set_keys_type(TKeysType::AGG_KEYS); + tablet_schema.__set_storage_type(TStorageType::COLUMN); + tablet_schema.__set_columns(cols); + TCreateTabletReq create_tablet_req; + create_tablet_req.__set_tablet_schema(tablet_schema); + create_tablet_req.__set_tablet_id(tablet_id); + create_tablet_req.__set_version(10); + + std::vector data_dirs; + data_dirs.push_back(_data_dir.get()); + RuntimeProfile profile("CreateTablet"); + st = _storage_engine->tablet_manager()->create_tablet(create_tablet_req, data_dirs, &profile); + EXPECT_EQ(st, Status::OK()); + TabletSharedPtr tablet = _storage_engine->tablet_manager()->get_tablet(tablet_id); + EXPECT_EQ(tablet->max_version().second, 10); + + for (int64_t i = 5; i < 12; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 5); + for (int64_t i = 1; i < 8; ++i) { + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7 - i); + } + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0); + + for (int64_t i = 100; i < config::max_tablet_version_num + 120; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num + 20); + + for (int64_t i = 90; i < 120; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num + 30); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 90); + + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 120); + + st = _storage_engine->tablet_manager()->drop_tablet(tablet_id, 0, false); + EXPECT_EQ(st, Status::OK()); + + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num); + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0); +} + } // namespace doris