[improvement](merge-on-write) Optimize publish when there are missing versions (#28012)

1. Do not retry publishing on be When there are too many missing versions, just
add to async publish task.
2. To reduce memory consumption, clean up the tasks when there are too many
async publish tasks.
This commit is contained in:
Xin Liao
2023-12-13 16:59:25 +08:00
committed by GitHub
parent dee89d2c4a
commit e6e8632167
7 changed files with 200 additions and 63 deletions

View File

@ -1458,6 +1458,11 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
}
if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
// there are too many missing versions, it has been be added to async
// publish task, so no need to retry here.
if (discontinuous_version_tablets.empty()) {
break;
}
LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
<< "transaction_id: " << publish_version_req.transaction_id;

View File

@ -1084,6 +1084,9 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9");
DEFINE_mBool(enable_merge_on_write_correctness_check, "true");
// rowid conversion correctness check when compaction for mow table
DEFINE_mBool(enable_rowid_conversion_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");

View File

@ -1149,6 +1149,9 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
DECLARE_mBool(enable_merge_on_write_correctness_check);
// rowid conversion correctness check when compaction for mow table
DECLARE_mBool(enable_rowid_conversion_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);

View File

@ -32,6 +32,7 @@
#include <mutex>
#include <ostream>
#include <random>
#include <shared_mutex>
#include <string>
#include <type_traits>
#include <unordered_set>
@ -225,7 +226,7 @@ Status StorageEngine::start_bg_threads() {
.build(&_tablet_publish_txn_thread_pool));
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "aync_publish_version_thread",
"StorageEngine", "async_publish_version_thread",
[this]() { this->_async_publish_callback(); }, &_async_publish_thread));
LOG(INFO) << "async publish thread started";
@ -1189,6 +1190,20 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
int64_t publish_version, int64_t transaction_id,
bool is_recovery) {
if (!is_recovery) {
bool exists = false;
{
std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
tablet_iter != _async_publish_tasks.end()) {
if (auto iter = tablet_iter->second.find(publish_version);
iter != tablet_iter->second.end()) {
exists = true;
}
}
}
if (exists) {
return;
}
TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
@ -1205,12 +1220,12 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
<< " version: " << publish_version << " txn_id:" << transaction_id
<< " is_recovery: " << is_recovery;
std::lock_guard<std::mutex> lock(_async_publish_mutex);
std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
}
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
std::lock_guard<std::mutex> lock(_async_publish_mutex);
std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
auto iter = _async_publish_tasks.find(tablet_id);
if (iter == _async_publish_tasks.end()) {
return INT64_MAX;
@ -1221,58 +1236,67 @@ int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
return iter->second.begin()->first;
}
void StorageEngine::_process_async_publish() {
// tablet, publish_version
std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
{
std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
for (auto tablet_iter = _async_publish_tasks.begin();
tablet_iter != _async_publish_tasks.end();) {
if (tablet_iter->second.empty()) {
tablet_iter = _async_publish_tasks.erase(tablet_iter);
continue;
}
int64_t tablet_id = tablet_iter->first;
TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
if (!tablet) {
LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
<< tablet_id;
tablet_iter = _async_publish_tasks.erase(tablet_iter);
continue;
}
auto task_iter = tablet_iter->second.begin();
int64_t version = task_iter->first;
int64_t transaction_id = task_iter->second.first;
int64_t partition_id = task_iter->second.second;
int64_t max_version = tablet->max_version().second;
if (version <= max_version) {
need_removed_tasks.emplace_back(tablet, version);
tablet_iter->second.erase(task_iter);
tablet_iter++;
continue;
}
if (version != max_version + 1) {
// Keep only the most recent versions
while (tablet_iter->second.size() > config::max_tablet_version_num) {
need_removed_tasks.emplace_back(tablet, version);
task_iter = tablet_iter->second.erase(task_iter);
version = task_iter->first;
}
tablet_iter++;
continue;
}
auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
tablet, partition_id, transaction_id, version);
static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
[=]() { async_publish_task->handle(); }));
tablet_iter->second.erase(task_iter);
need_removed_tasks.emplace_back(tablet, version);
tablet_iter++;
}
}
for (auto& [tablet, publish_version] : need_removed_tasks) {
static_cast<void>(TabletMetaManager::remove_pending_publish_info(
tablet->data_dir(), tablet->tablet_id(), publish_version));
}
}
void StorageEngine::_async_publish_callback() {
while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
// tablet, publish_version
std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
{
std::lock_guard<std::mutex> lock(_async_publish_mutex);
for (auto tablet_iter = _async_publish_tasks.begin();
tablet_iter != _async_publish_tasks.end();) {
if (tablet_iter->second.empty()) {
tablet_iter = _async_publish_tasks.erase(tablet_iter);
continue;
}
int64_t tablet_id = tablet_iter->first;
TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
if (!tablet) {
LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
<< tablet_id;
tablet_iter = _async_publish_tasks.erase(tablet_iter);
continue;
}
auto task_iter = tablet_iter->second.begin();
int64_t version = task_iter->first;
int64_t transaction_id = task_iter->second.first;
int64_t partition_id = task_iter->second.second;
int64_t max_version = tablet->max_version().second;
if (version <= max_version) {
need_removed_tasks.emplace_back(tablet, version);
tablet_iter->second.erase(task_iter);
tablet_iter++;
continue;
}
if (version != max_version + 1) {
tablet_iter++;
continue;
}
auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
tablet, partition_id, transaction_id, version);
static_cast<void>(
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
[=]() { async_publish_task->handle(); }));
tablet_iter->second.erase(task_iter);
need_removed_tasks.emplace_back(tablet, version);
tablet_iter++;
}
}
for (auto& [tablet, publish_version] : need_removed_tasks) {
static_cast<void>(TabletMetaManager::remove_pending_publish_info(
tablet->data_dir(), tablet->tablet_id(), publish_version));
}
_process_async_publish();
}
}

View File

@ -30,6 +30,7 @@
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
@ -333,6 +334,8 @@ private:
void _async_publish_callback();
void _process_async_publish();
Status _persist_broken_paths();
private:
@ -487,7 +490,7 @@ private:
std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
// aync publish for discontinuous versions of merge_on_write table
scoped_refptr<Thread> _async_publish_thread;
std::mutex _async_publish_mutex;
std::shared_mutex _async_publish_lock;
bool _clear_segment_cache = false;

View File

@ -191,8 +191,17 @@ Status EnginePublishVersionTask::execute() {
}
auto handle_version_not_continuous = [&]() {
add_error_tablet_id(tablet_info.tablet_id);
_discontinuous_version_tablets->emplace_back(
partition_id, tablet_info.tablet_id, version.first);
// When there are too many missing versions, do not directly retry the
// publish and handle it through async publish.
if (max_version + config::mow_publish_max_discontinuous_version_num <
version.first) {
StorageEngine::instance()->add_async_publish_task(
partition_id, tablet_info.tablet_id, version.first,
_publish_version_req.transaction_id, false);
} else {
_discontinuous_version_tablets->emplace_back(
partition_id, tablet_info.tablet_id, version.first);
}
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"check_version_exist failed");
int64_t missed_version = max_version + 1;

View File

@ -21,17 +21,16 @@
#include <gmock/gmock-matchers.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <filesystem>
#include "common/status.h"
#include "gtest/gtest_pred_impl.h"
#include "testutil/test_util.h"
using ::testing::_;
using ::testing::Return;
using ::testing::SetArgPointee;
using std::string;
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/tablet_manager.h"
#include "util/threadpool.h"
namespace doris {
using namespace config;
@ -39,14 +38,29 @@ using namespace config;
class StorageEngineTest : public testing::Test {
public:
virtual void SetUp() {
_engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
EXPECT_TRUE(
io::global_local_filesystem()->delete_and_create_directory(_engine_data_path).ok());
EXPECT_TRUE(
io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok());
_data_dir.reset(new DataDir(_engine_data_path, 100000000));
static_cast<void>(_data_dir->init());
EngineOptions options;
options.backend_uid = UniqueId::gen_uid();
_storage_engine.reset(new StorageEngine(options));
ExecEnv::GetInstance()->set_storage_engine(_storage_engine.get());
}
virtual void TearDown() {}
virtual void TearDown() {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
std::unique_ptr<StorageEngine> _storage_engine;
std::string _engine_data_path;
std::unique_ptr<DataDir> _data_dir;
};
TEST_F(StorageEngineTest, TestBrokenDisk) {
@ -86,4 +100,80 @@ TEST_F(StorageEngineTest, TestBrokenDisk) {
}
}
TEST_F(StorageEngineTest, TestAsyncPublish) {
auto st = ThreadPoolBuilder("TabletPublishTxnThreadPool")
.set_min_threads(config::tablet_publish_txn_max_thread)
.set_max_threads(config::tablet_publish_txn_max_thread)
.build(&_storage_engine->tablet_publish_txn_thread_pool());
EXPECT_EQ(st, Status::OK());
int64_t partition_id = 1;
int64_t tablet_id = 111;
TColumnType col_type;
col_type.__set_type(TPrimitiveType::SMALLINT);
TColumn col1;
col1.__set_column_name("col1");
col1.__set_column_type(col_type);
col1.__set_is_key(true);
std::vector<TColumn> cols;
cols.push_back(col1);
TTabletSchema tablet_schema;
tablet_schema.__set_short_key_column_count(1);
tablet_schema.__set_schema_hash(3333);
tablet_schema.__set_keys_type(TKeysType::AGG_KEYS);
tablet_schema.__set_storage_type(TStorageType::COLUMN);
tablet_schema.__set_columns(cols);
TCreateTabletReq create_tablet_req;
create_tablet_req.__set_tablet_schema(tablet_schema);
create_tablet_req.__set_tablet_id(tablet_id);
create_tablet_req.__set_version(10);
std::vector<DataDir*> data_dirs;
data_dirs.push_back(_data_dir.get());
RuntimeProfile profile("CreateTablet");
st = _storage_engine->tablet_manager()->create_tablet(create_tablet_req, data_dirs, &profile);
EXPECT_EQ(st, Status::OK());
TabletSharedPtr tablet = _storage_engine->tablet_manager()->get_tablet(tablet_id);
EXPECT_EQ(tablet->max_version().second, 10);
for (int64_t i = 5; i < 12; ++i) {
_storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false);
}
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7);
EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 5);
for (int64_t i = 1; i < 8; ++i) {
_storage_engine->_process_async_publish();
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7 - i);
}
_storage_engine->_process_async_publish();
EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0);
for (int64_t i = 100; i < config::max_tablet_version_num + 120; ++i) {
_storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false);
}
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
config::max_tablet_version_num + 20);
for (int64_t i = 90; i < 120; ++i) {
_storage_engine->add_async_publish_task(partition_id, tablet_id, i, i, false);
}
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
config::max_tablet_version_num + 30);
EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 90);
_storage_engine->_process_async_publish();
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
config::max_tablet_version_num);
EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 120);
st = _storage_engine->tablet_manager()->drop_tablet(tablet_id, 0, false);
EXPECT_EQ(st, Status::OK());
EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
config::max_tablet_version_num);
_storage_engine->_process_async_publish();
EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0);
}
} // namespace doris