[feature-wip](unique-key-merge-on-write) update delete bitmap while publish version (#11195)

1.make version publish work in version order
2.update delete bitmap while publish version, load current version rowset
primary key and search in pre rowsets
3.speed up publish version task by parallel tablet publish task

Co-authored-by: yixiutt <yixiu@selectdb.com>
This commit is contained in:
yixiutt
2022-07-27 16:26:42 +08:00
committed by GitHub
parent 461a31b1f6
commit 01e108cb7b
15 changed files with 297 additions and 45 deletions

View File

@ -711,6 +711,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
res = _env->storage_engine()->execute_task(&engine_task);
if (res.ok()) {
break;
} else if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) {
// version not continuous, put to queue and wait pre version publish
// task execute
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_tasks.push_back(agent_task_req);
_worker_thread_condition_variable.notify_one();
break;
} else {
LOG(WARNING) << "publish version error, retry. [transaction_id="
<< publish_version_req.transaction_id
@ -719,6 +726,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) {
continue;
}
TFinishTaskRequest finish_task_request;
if (!res) {

View File

@ -81,6 +81,8 @@ CONF_Int32(push_worker_count_normal_priority, "3");
CONF_Int32(push_worker_count_high_priority, "3");
// the count of thread to publish version
CONF_Int32(publish_version_worker_count, "8");
// the count of tablet thread to publish version
CONF_Int32(tablet_publish_txn_max_thread, "32");
// the count of thread to clear transaction task
CONF_Int32(clear_transaction_task_worker_count, "1");
// the count of thread to delete

View File

@ -231,7 +231,8 @@ namespace doris {
M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true) \
M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \
M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \
M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true)
M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \
M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false)
enum ErrorCode {
#define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE,

View File

@ -137,6 +137,12 @@ Status StorageEngine::start_bg_threads() {
&_cooldown_tasks_producer_thread));
LOG(INFO) << "cooldown tasks producer thread started";
// add tablet publish version thread pool
ThreadPoolBuilder("TabletPublishTxnThreadPool")
.set_min_threads(config::tablet_publish_txn_max_thread)
.set_max_threads(config::tablet_publish_txn_max_thread)
.build(&_tablet_publish_txn_thread_pool);
LOG(INFO) << "all storage engine's background threads are started.";
return Status::OK();
}

View File

@ -78,7 +78,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
}
}
RETURN_IF_ERROR(_load_index());
RETURN_IF_ERROR(load_index());
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
iter->get()->init(read_options);
return Status::OK();
@ -134,7 +134,7 @@ Status Segment::_parse_footer() {
return Status::OK();
}
Status Segment::_load_index() {
Status Segment::load_index() {
return _load_index_once.call([this] {
// read and parse short key index page
PageReadOptions opts;
@ -225,7 +225,7 @@ Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
}
Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) {
RETURN_IF_ERROR(_load_index());
RETURN_IF_ERROR(load_index());
DCHECK(_pk_index_reader != nullptr);
if (!_pk_index_reader->check_present(key)) {
return Status::NotFound("Can't find key in the segment");

View File

@ -92,6 +92,8 @@ public:
// only used by UT
const SegmentFooterPB& footer() const { return _footer; }
Status load_index();
private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(uint32_t segment_id, const TabletSchema* tablet_schema);
@ -99,9 +101,6 @@ private:
Status _open();
Status _parse_footer();
Status _create_column_readers();
// Load and decode short key index.
// May be called multiple times, subsequent calls will no op.
Status _load_index();
private:
friend class SegmentIterator;

View File

@ -192,6 +192,10 @@ public:
Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
Status submit_quick_compaction_task(TabletSharedPtr tablet);
std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
return _tablet_publish_txn_thread_pool;
}
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@ -382,6 +386,8 @@ private:
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
CompactionPermitLimiter _permit_limiter;

View File

@ -438,6 +438,11 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
}
inline bool Tablet::enable_unique_key_merge_on_write() const {
#ifdef BE_TEST
if (_tablet_meta == nullptr) {
return false;
}
#endif
return _tablet_meta->enable_unique_key_merge_on_write();
}

View File

@ -17,6 +17,8 @@
#include "olap/task/engine_publish_version_task.h"
#include <util/defer_op.h>
#include <map>
#include "olap/data_dir.h"
@ -34,13 +36,38 @@ EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publi
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
_error_tablet_ids->push_back(tablet_id);
}
void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
_succ_tablet_ids->push_back(tablet_id);
}
void EnginePublishVersionTask::wait() {
std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
_tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10));
}
void EnginePublishVersionTask::notify() {
std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
_tablet_finish_sleep_cond.notify_one();
}
Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
int64_t transaction_id = _publish_version_req.transaction_id;
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
// each partition
bool meet_version_not_continuous = false;
std::atomic<int64_t> total_task_num(0);
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
if (meet_version_not_continuous) {
break;
}
int64_t partition_id = par_ver_info.partition_id;
// get all partition related tablets and check whether the tablet have the related version
std::set<TabletInfo> partition_related_tablet_infos;
@ -60,7 +87,9 @@ Status EnginePublishVersionTask::finish() {
// each tablet
for (auto& tablet_rs : tablet_related_rs) {
Status publish_status = Status::OK();
if (meet_version_not_continuous) {
break;
}
TabletInfo tablet_info = tablet_rs.first;
RowsetSharedPtr rowset = tablet_rs.second;
VLOG_CRITICAL << "begin to publish version on tablet. "
@ -86,39 +115,45 @@ Status EnginePublishVersionTask::finish() {
res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST);
continue;
}
publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
partition_id, tablet, transaction_id, version);
if (publish_status != Status::OK()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << tablet_info.tablet_id
<< ", txn_id=" << transaction_id;
_error_tablet_ids->push_back(tablet_info.tablet_id);
res = publish_status;
Version max_version = tablet->max_version();
// in uniq key model with merge-on-write, we should see all
// previous version when update delete bitmap, so add a check
// here and wait pre version publish or lock timeout
if (tablet->keys_type() == KeysType::UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write() &&
version.first != max_version.second + 1) {
LOG(INFO) << "uniq key with merge-on-write version not continuous, current max "
"version="
<< max_version.second << ", publish_version=" << version.first
<< " tablet_id=" << tablet->tablet_id();
meet_version_not_continuous = true;
res = Status::OLAPInternalError(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS);
continue;
}
// add visible rowset to tablet
publish_status = tablet->add_inc_rowset(rowset);
if (publish_status != Status::OK() &&
publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id="
<< rowset->rowset_id() << ", tablet_id=" << tablet_info.tablet_id
<< ", txn_id=" << transaction_id << ", res=" << publish_status;
_error_tablet_ids->push_back(tablet_info.tablet_id);
res = publish_status;
continue;
}
if (_succ_tablet_ids != nullptr) {
_succ_tablet_ids->push_back(tablet_info.tablet_id);
}
partition_related_tablet_infos.erase(tablet_info);
VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
<< ", transaction_id=" << transaction_id << ", version=" << version.first
<< ", res=" << publish_status;
total_task_num.fetch_add(1);
auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info,
&total_task_num);
auto submit_st =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
[=]() { tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok());
}
}
// wait for all publish txn finished
while (total_task_num.load() != 0) {
wait();
}
// check if the related tablet remained all have the version
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
// get all partition related tablets and check whether the tablet have the related version
std::set<TabletInfo> partition_related_tablet_infos;
StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
partition_id, &partition_related_tablet_infos);
Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
// has to use strict mode to check if check all tablets
if (!_publish_version_req.strict_mode) {
@ -127,11 +162,11 @@ Status EnginePublishVersionTask::finish() {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id);
if (tablet == nullptr) {
_error_tablet_ids->push_back(tablet_info.tablet_id);
add_error_tablet_id(tablet_info.tablet_id);
} else {
// check if the version exist, if not exist, then set publish failed
if (!tablet->check_version_exist(version)) {
_error_tablet_ids->push_back(tablet_info.tablet_id);
add_error_tablet_id(tablet_info.tablet_id);
}
}
}
@ -143,4 +178,51 @@ Status EnginePublishVersionTask::finish() {
return res;
}
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task,
TabletSharedPtr tablet, RowsetSharedPtr rowset,
int64_t partition_id, int64_t transaction_id,
Version version, const TabletInfo& tablet_info,
std::atomic<int64_t>* total_task_num)
: _engine_publish_version_task(engine_task),
_tablet(tablet),
_rowset(rowset),
_partition_id(partition_id),
_transaction_id(transaction_id),
_version(version),
_tablet_info(tablet_info),
_total_task_num(total_task_num) {}
void TabletPublishTxnTask::handle() {
Defer defer {[&] {
if (_total_task_num->fetch_sub(1) == 1) {
_engine_publish_version_task->notify();
}
}};
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version);
if (publish_status != Status::OK()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
// add visible rowset to tablet
publish_status = _tablet->add_inc_rowset(_rowset);
if (publish_status != Status::OK() &&
publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
_engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
VLOG_NOTICE << "publish version successfully on tablet. tablet=" << _tablet->full_name()
<< ", transaction_id=" << _transaction_id << ", version=" << _version.first
<< ", res=" << publish_status;
return;
}
} // namespace doris

View File

@ -24,6 +24,30 @@
namespace doris {
class EnginePublishVersionTask;
class TabletPublishTxnTask {
public:
TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet,
RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id,
Version version, const TabletInfo& tablet_info,
std::atomic<int64_t>* total_task_num);
~TabletPublishTxnTask() {}
void handle();
private:
EnginePublishVersionTask* _engine_publish_version_task;
TabletSharedPtr _tablet;
RowsetSharedPtr _rowset;
int64_t _partition_id;
int64_t _transaction_id;
Version _version;
TabletInfo _tablet_info;
std::atomic<int64_t>* _total_task_num;
};
class EnginePublishVersionTask : public EngineTask {
public:
EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
@ -33,10 +57,20 @@ public:
virtual Status finish() override;
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);
void notify();
void wait();
private:
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
std::mutex _tablet_finish_sleep_mutex;
std::condition_variable _tablet_finish_sleep_cond;
};
} // namespace doris

View File

@ -44,6 +44,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "rowset/beta_rowset.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
#include "util/time.h"
@ -308,8 +309,109 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
}
}
}
auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
#ifdef BE_TEST
if (tablet == nullptr) {
return Status::OK();
}
#endif
// Check if have to build extra delete bitmap for table of UNIQUE_KEY model
if (!tablet->enable_unique_key_merge_on_write() ||
tablet->tablet_meta()->preferred_rowset_type() != RowsetTypePB::BETA_ROWSET ||
rowset_ptr->keys_type() != KeysType::UNIQUE_KEYS) {
return Status::OK();
}
CHECK(version.first == version.second) << "impossible: " << version;
// For each key in current set, check if it overwrites any previously
// written keys
OlapStopWatch watch;
std::vector<segment_v2::SegmentSharedPtr> segments;
std::vector<segment_v2::SegmentSharedPtr> pre_segments;
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
Status st = beta_rowset->load_segments(&segments);
if (!st.ok()) return st;
// lock tablet meta to modify delete bitmap
std::lock_guard<std::shared_mutex> meta_wrlock(tablet->get_header_lock());
for (auto& seg : segments) {
seg->load_index(); // We need index blocks to iterate
auto pk_idx = seg->get_primary_key_index();
int cnt = 0;
int total = pk_idx->num_rows();
int32_t remaining = total;
bool exact_match = false;
std::string last_key;
int batch_size = 1024;
MemPool pool;
while (remaining > 0) {
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
RETURN_IF_ERROR(pk_idx->new_iterator(&iter));
size_t num_to_read = std::min(batch_size, remaining);
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(),
nullptr, &cvb));
ColumnBlock block(cvb.get(), &pool);
ColumnBlockView column_block_view(&block);
Slice last_key_slice(last_key);
RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match));
size_t num_read = num_to_read;
RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view));
DCHECK(num_to_read == num_read);
last_key = (reinterpret_cast<const Slice*>(cvb->cell_ptr(num_read - 1)))->to_string();
// exclude last_key, last_key will be read in next batch.
if (num_read == batch_size && num_read != remaining) {
num_read -= 1;
}
for (size_t i = 0; i < num_read; i++) {
const Slice* key = reinterpret_cast<const Slice*>(cvb->cell_ptr(i));
// first check if exist in pre segment
bool find = _check_pk_in_pre_segments(pre_segments, *key, tablet, version);
if (find) {
cnt++;
continue;
}
RowLocation loc;
st = tablet->lookup_row_key(*key, &loc, version.first - 1);
CHECK(st.ok() || st.is_not_found());
if (st.is_not_found()) continue;
++cnt;
// TODO: we can just set a bitmap onece we are done while iteration
tablet->tablet_meta()->delete_bitmap().add(
{loc.rowset_id, loc.segment_id, version.first}, loc.row_id);
}
remaining -= num_read;
}
LOG(INFO) << "construct delete bitmap tablet: " << tablet->tablet_id()
<< " rowset: " << beta_rowset->rowset_id() << " segment: " << seg->id()
<< " version: " << version << " delete: " << cnt << "/" << total;
pre_segments.emplace_back(seg);
}
tablet->save_meta();
LOG(INFO) << "finished to update delete bitmap, tablet: " << tablet->tablet_id()
<< " version: " << version << ", elapse(us): " << watch.get_elapse_time_us();
return Status::OK();
}
bool TxnManager::_check_pk_in_pre_segments(
const std::vector<segment_v2::SegmentSharedPtr>& pre_segments, const Slice& key,
TabletSharedPtr tablet, const Version& version) {
for (auto it = pre_segments.rbegin(); it != pre_segments.rend(); ++it) {
RowLocation loc;
auto st = (*it)->lookup_row_key(key, &loc);
CHECK(st.ok() || st.is_not_found());
if (st.is_not_found()) {
continue;
}
tablet->tablet_meta()->delete_bitmap().add({loc.rowset_id, loc.segment_id, version.first},
loc.row_id);
return true;
}
return false;
}
// txn could be rollbacked if it does not have related rowset

View File

@ -42,6 +42,7 @@
#include "olap/options.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet.h"
#include "util/time.h"
@ -172,6 +173,10 @@ private:
void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
bool _check_pk_in_pre_segments(const std::vector<segment_v2::SegmentSharedPtr>& pre_segments,
const Slice& key, TabletSharedPtr tablet,
const Version& version);
private:
const int32_t _txn_map_shard_size;

View File

@ -207,8 +207,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id, write_req.schema_hash,
tablet_rs.first.tablet_uid, version);
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
EXPECT_EQ(Status::OK(), res);
res = tablet->add_inc_rowset(rowset);
EXPECT_EQ(Status::OK(), res);

View File

@ -201,8 +201,8 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
RowsetSharedPtr rowset = tablet_rs.second;
rowset->rowset_meta()->set_resource_id(kResourceId);
st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id, write_req.schema_hash,
tablet_rs.first.tablet_uid, version);
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);

View File

@ -200,8 +200,8 @@ TEST_F(TabletCooldownTest, normal) {
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id, write_req.schema_hash,
tablet_rs.first.tablet_uid, version);
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);