diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9f109560b..2e35faac45 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1458,6 +1458,11 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& } if (status.is()) { + // there are too many missing versions, it has been be added to async + // publish task, so no need to retry here. + if (discontinuous_version_tablets.empty()) { + break; + } LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " << "transaction_id: " << publish_version_req.transaction_id; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7dbafcc62b..1be582fcc7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1084,6 +1084,9 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9"); DEFINE_mBool(enable_merge_on_write_correctness_check, "true"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); +// When the number of missing versions is more than this value, do not directly +// retry the publish and handle it through async publish. +DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); // The secure path with user files, used in the `local` table function. DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f4f2845098..069049fe70 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1149,6 +1149,9 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); DECLARE_mBool(enable_merge_on_write_correctness_check); // rowid conversion correctness check when compaction for mow table DECLARE_mBool(enable_rowid_conversion_correctness_check); +// When the number of missing versions is more than this value, do not directly +// retry the publish and handle it through async publish. +DECLARE_mInt32(mow_publish_max_discontinuous_version_num); // The secure path with user files, used in the `local` table function. DECLARE_mString(user_files_secure_path); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 881cc56a70..b4bd310592 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -225,7 +226,7 @@ Status StorageEngine::start_bg_threads() { .build(&_tablet_publish_txn_thread_pool)); RETURN_IF_ERROR(Thread::create( - "StorageEngine", "aync_publish_version_thread", + "StorageEngine", "async_publish_version_thread", [this]() { this->_async_publish_callback(); }, &_async_publish_thread)); LOG(INFO) << "async publish thread started"; @@ -1189,6 +1190,20 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ int64_t publish_version, int64_t transaction_id, bool is_recovery) { if (!is_recovery) { + bool exists = false; + { + std::shared_lock rlock(_async_publish_lock); + if (auto tablet_iter = _async_publish_tasks.find(tablet_id); + tablet_iter != _async_publish_tasks.end()) { + if (auto iter = tablet_iter->second.find(publish_version); + iter != tablet_iter->second.end()) { + exists = true; + } + } + } + if (exists) { + return; + } TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: " @@ -1205,12 +1220,12 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_ LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id << " version: " << publish_version << " txn_id:" << transaction_id << " is_recovery: " << is_recovery; - std::lock_guard lock(_async_publish_mutex); + std::unique_lock wlock(_async_publish_lock); _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id}; } int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { - std::lock_guard lock(_async_publish_mutex); + std::shared_lock rlock(_async_publish_lock); auto iter = _async_publish_tasks.find(tablet_id); if (iter == _async_publish_tasks.end()) { return INT64_MAX; @@ -1221,58 +1236,67 @@ int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { return iter->second.begin()->first; } +void StorageEngine::_process_async_publish() { + // tablet, publish_version + std::vector> need_removed_tasks; + { + std::unique_lock wlock(_async_publish_lock); + for (auto tablet_iter = _async_publish_tasks.begin(); + tablet_iter != _async_publish_tasks.end();) { + if (tablet_iter->second.empty()) { + tablet_iter = _async_publish_tasks.erase(tablet_iter); + continue; + } + int64_t tablet_id = tablet_iter->first; + TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); + if (!tablet) { + LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " + << tablet_id; + tablet_iter = _async_publish_tasks.erase(tablet_iter); + continue; + } + + auto task_iter = tablet_iter->second.begin(); + int64_t version = task_iter->first; + int64_t transaction_id = task_iter->second.first; + int64_t partition_id = task_iter->second.second; + int64_t max_version = tablet->max_version().second; + + if (version <= max_version) { + need_removed_tasks.emplace_back(tablet, version); + tablet_iter->second.erase(task_iter); + tablet_iter++; + continue; + } + if (version != max_version + 1) { + // Keep only the most recent versions + while (tablet_iter->second.size() > config::max_tablet_version_num) { + need_removed_tasks.emplace_back(tablet, version); + task_iter = tablet_iter->second.erase(task_iter); + version = task_iter->first; + } + tablet_iter++; + continue; + } + + auto async_publish_task = std::make_shared( + tablet, partition_id, transaction_id, version); + static_cast(_tablet_publish_txn_thread_pool->submit_func( + [=]() { async_publish_task->handle(); })); + tablet_iter->second.erase(task_iter); + need_removed_tasks.emplace_back(tablet, version); + tablet_iter++; + } + } + for (auto& [tablet, publish_version] : need_removed_tasks) { + static_cast(TabletMetaManager::remove_pending_publish_info( + tablet->data_dir(), tablet->tablet_id(), publish_version)); + } +} + void StorageEngine::_async_publish_callback() { while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) { - // tablet, publish_version - std::vector> need_removed_tasks; - { - std::lock_guard lock(_async_publish_mutex); - for (auto tablet_iter = _async_publish_tasks.begin(); - tablet_iter != _async_publish_tasks.end();) { - if (tablet_iter->second.empty()) { - tablet_iter = _async_publish_tasks.erase(tablet_iter); - continue; - } - int64_t tablet_id = tablet_iter->first; - TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); - if (!tablet) { - LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " - << tablet_id; - tablet_iter = _async_publish_tasks.erase(tablet_iter); - continue; - } - - auto task_iter = tablet_iter->second.begin(); - int64_t version = task_iter->first; - int64_t transaction_id = task_iter->second.first; - int64_t partition_id = task_iter->second.second; - int64_t max_version = tablet->max_version().second; - - if (version <= max_version) { - need_removed_tasks.emplace_back(tablet, version); - tablet_iter->second.erase(task_iter); - tablet_iter++; - continue; - } - if (version != max_version + 1) { - tablet_iter++; - continue; - } - - auto async_publish_task = std::make_shared( - tablet, partition_id, transaction_id, version); - static_cast( - StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( - [=]() { async_publish_task->handle(); })); - tablet_iter->second.erase(task_iter); - need_removed_tasks.emplace_back(tablet, version); - tablet_iter++; - } - } - for (auto& [tablet, publish_version] : need_removed_tasks) { - static_cast(TabletMetaManager::remove_pending_publish_info( - tablet->data_dir(), tablet->tablet_id(), publish_version)); - } + _process_async_publish(); } } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 679bcc68ce..4cb803a10a 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -333,6 +334,8 @@ private: void _async_publish_callback(); + void _process_async_publish(); + Status _persist_broken_paths(); private: @@ -487,7 +490,7 @@ private: std::map>> _async_publish_tasks; // aync publish for discontinuous versions of merge_on_write table scoped_refptr _async_publish_thread; - std::mutex _async_publish_mutex; + std::shared_mutex _async_publish_lock; bool _clear_segment_cache = false; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 9c76a2a3e9..5e3254f128 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -191,8 +191,17 @@ Status EnginePublishVersionTask::execute() { } auto handle_version_not_continuous = [&]() { add_error_tablet_id(tablet_info.tablet_id); - _discontinuous_version_tablets->emplace_back( - partition_id, tablet_info.tablet_id, version.first); + // When there are too many missing versions, do not directly retry the + // publish and handle it through async publish. + if (max_version + config::mow_publish_max_discontinuous_version_num < + version.first) { + StorageEngine::instance()->add_async_publish_task( + partition_id, tablet_info.tablet_id, version.first, + _publish_version_req.transaction_id, false); + } else { + _discontinuous_version_tablets->emplace_back( + partition_id, tablet_info.tablet_id, version.first); + } res = Status::Error( "check_version_exist failed"); int64_t missed_version = max_version + 1; diff --git a/be/test/olap/storage_engine_test.cpp b/be/test/olap/storage_engine_test.cpp index ebf572ef5f..eb1fdc4e8c 100644 --- a/be/test/olap/storage_engine_test.cpp +++ b/be/test/olap/storage_engine_test.cpp @@ -21,17 +21,16 @@ #include #include #include +#include #include #include "common/status.h" #include "gtest/gtest_pred_impl.h" -#include "testutil/test_util.h" - -using ::testing::_; -using ::testing::Return; -using ::testing::SetArgPointee; -using std::string; +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/tablet_manager.h" +#include "util/threadpool.h" namespace doris { using namespace config; @@ -39,14 +38,29 @@ using namespace config; class StorageEngineTest : public testing::Test { public: virtual void SetUp() { + _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp"; + EXPECT_TRUE( + io::global_local_filesystem()->delete_and_create_directory(_engine_data_path).ok()); + EXPECT_TRUE( + io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok()); + _data_dir.reset(new DataDir(_engine_data_path, 100000000)); + static_cast(_data_dir->init()); + EngineOptions options; + options.backend_uid = UniqueId::gen_uid(); _storage_engine.reset(new StorageEngine(options)); + ExecEnv::GetInstance()->set_storage_engine(_storage_engine.get()); } - virtual void TearDown() {} + virtual void TearDown() { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } std::unique_ptr _storage_engine; + std::string _engine_data_path; + std::unique_ptr _data_dir; }; TEST_F(StorageEngineTest, TestBrokenDisk) { @@ -86,4 +100,80 @@ TEST_F(StorageEngineTest, TestBrokenDisk) { } } +TEST_F(StorageEngineTest, TestAsyncPublish) { + auto st = ThreadPoolBuilder("TabletPublishTxnThreadPool") + .set_min_threads(config::tablet_publish_txn_max_thread) + .set_max_threads(config::tablet_publish_txn_max_thread) + .build(&_storage_engine->tablet_publish_txn_thread_pool()); + EXPECT_EQ(st, Status::OK()); + + int64_t partition_id = 1; + int64_t tablet_id = 111; + + TColumnType col_type; + col_type.__set_type(TPrimitiveType::SMALLINT); + TColumn col1; + col1.__set_column_name("col1"); + col1.__set_column_type(col_type); + col1.__set_is_key(true); + std::vector cols; + cols.push_back(col1); + TTabletSchema tablet_schema; + tablet_schema.__set_short_key_column_count(1); + tablet_schema.__set_schema_hash(3333); + tablet_schema.__set_keys_type(TKeysType::AGG_KEYS); + tablet_schema.__set_storage_type(TStorageType::COLUMN); + tablet_schema.__set_columns(cols); + TCreateTabletReq create_tablet_req; + create_tablet_req.__set_tablet_schema(tablet_schema); + create_tablet_req.__set_tablet_id(tablet_id); + create_tablet_req.__set_version(10); + + std::vector data_dirs; + data_dirs.push_back(_data_dir.get()); + RuntimeProfile profile("CreateTablet"); + st = _storage_engine->tablet_manager()->create_tablet(create_tablet_req, data_dirs, &profile); + EXPECT_EQ(st, Status::OK()); + TabletSharedPtr tablet = _storage_engine->tablet_manager()->get_tablet(tablet_id); + EXPECT_EQ(tablet->max_version().second, 10); + + for (int64_t i = 5; i < 12; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 5); + for (int64_t i = 1; i < 8; ++i) { + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7 - i); + } + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0); + + for (int64_t i = 100; i < config::max_tablet_version_num + 120; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num + 20); + + for (int64_t i = 90; i < 120; ++i) { + _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false); + } + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num + 30); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 90); + + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num); + EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 120); + + st = _storage_engine->tablet_manager()->drop_tablet(tablet_id, 0, false); + EXPECT_EQ(st, Status::OK()); + + EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), + config::max_tablet_version_num); + _storage_engine->_process_async_publish(); + EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0); +} + } // namespace doris