From 01e108cb7b0f5085a63a535f8cf571eb293d6d2b Mon Sep 17 00:00:00 2001 From: yixiutt <102007456+yixiutt@users.noreply.github.com> Date: Wed, 27 Jul 2022 16:26:42 +0800 Subject: [PATCH] [feature-wip](unique-key-merge-on-write) update delete bitmap while publish version (#11195) 1.make version publish work in version order 2.update delete bitmap while publish version, load current version rowset primary key and search in pre rowsets 3.speed up publish version task by parallel tablet publish task Co-authored-by: yixiutt --- be/src/agent/task_worker_pool.cpp | 10 ++ be/src/common/config.h | 2 + be/src/common/status.h | 3 +- be/src/olap/olap_server.cpp | 6 + be/src/olap/rowset/segment_v2/segment.cpp | 6 +- be/src/olap/rowset/segment_v2/segment.h | 5 +- be/src/olap/storage_engine.h | 6 + be/src/olap/tablet.h | 5 + .../olap/task/engine_publish_version_task.cpp | 146 ++++++++++++++---- .../olap/task/engine_publish_version_task.h | 34 ++++ be/src/olap/txn_manager.cpp | 102 ++++++++++++ be/src/olap/txn_manager.h | 5 + .../engine_storage_migration_task_test.cpp | 4 +- be/test/olap/tablet_clone_test.cpp | 4 +- be/test/olap/tablet_cooldown_test.cpp | 4 +- 15 files changed, 297 insertions(+), 45 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1380331e7a..40681ce17d 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -711,6 +711,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { res = _env->storage_engine()->execute_task(&engine_task); if (res.ok()) { break; + } else if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) { + // version not continuous, put to queue and wait pre version publish + // task execute + std::unique_lock worker_thread_lock(_worker_thread_lock); + _tasks.push_back(agent_task_req); + _worker_thread_condition_variable.notify_one(); + break; } else { LOG(WARNING) << "publish version error, retry. [transaction_id=" << publish_version_req.transaction_id @@ -719,6 +726,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { std::this_thread::sleep_for(std::chrono::seconds(1)); } } + if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) { + continue; + } TFinishTaskRequest finish_task_request; if (!res) { diff --git a/be/src/common/config.h b/be/src/common/config.h index ee8698bcb5..9891df2f18 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -81,6 +81,8 @@ CONF_Int32(push_worker_count_normal_priority, "3"); CONF_Int32(push_worker_count_high_priority, "3"); // the count of thread to publish version CONF_Int32(publish_version_worker_count, "8"); +// the count of tablet thread to publish version +CONF_Int32(tablet_publish_txn_max_thread, "32"); // the count of thread to clear transaction task CONF_Int32(clear_transaction_task_worker_count, "1"); // the count of thread to delete diff --git a/be/src/common/status.h b/be/src/common/status.h index 830d417a7c..f441ccd759 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -231,7 +231,8 @@ namespace doris { M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true) \ M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \ M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \ - M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) + M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \ + M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false) enum ErrorCode { #define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE, diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index fbb1811705..28e9241985 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -137,6 +137,12 @@ Status StorageEngine::start_bg_threads() { &_cooldown_tasks_producer_thread)); LOG(INFO) << "cooldown tasks producer thread started"; + // add tablet publish version thread pool + ThreadPoolBuilder("TabletPublishTxnThreadPool") + .set_min_threads(config::tablet_publish_txn_max_thread) + .set_max_threads(config::tablet_publish_txn_max_thread) + .build(&_tablet_publish_txn_thread_pool); + LOG(INFO) << "all storage engine's background threads are started."; return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 09c29979bf..6b84cd9566 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -78,7 +78,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea } } - RETURN_IF_ERROR(_load_index()); + RETURN_IF_ERROR(load_index()); iter->reset(new SegmentIterator(this->shared_from_this(), schema)); iter->get()->init(read_options); return Status::OK(); @@ -134,7 +134,7 @@ Status Segment::_parse_footer() { return Status::OK(); } -Status Segment::_load_index() { +Status Segment::load_index() { return _load_index_once.call([this] { // read and parse short key index page PageReadOptions opts; @@ -225,7 +225,7 @@ Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column, } Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) { - RETURN_IF_ERROR(_load_index()); + RETURN_IF_ERROR(load_index()); DCHECK(_pk_index_reader != nullptr); if (!_pk_index_reader->check_present(key)) { return Status::NotFound("Can't find key in the segment"); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index e8cb0c3081..5323ace873 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -92,6 +92,8 @@ public: // only used by UT const SegmentFooterPB& footer() const { return _footer; } + Status load_index(); + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, const TabletSchema* tablet_schema); @@ -99,9 +101,6 @@ private: Status _open(); Status _parse_footer(); Status _create_column_readers(); - // Load and decode short key index. - // May be called multiple times, subsequent calls will no op. - Status _load_index(); private: friend class SegmentIterator; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 1b6b28b4c4..df018e27d3 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -192,6 +192,10 @@ public: Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); Status submit_quick_compaction_task(TabletSharedPtr tablet); + std::unique_ptr& tablet_publish_txn_thread_pool() { + return _tablet_publish_txn_thread_pool; + } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -382,6 +386,8 @@ private: std::unique_ptr _base_compaction_thread_pool; std::unique_ptr _cumu_compaction_thread_pool; + std::unique_ptr _tablet_publish_txn_thread_pool; + std::unique_ptr _tablet_meta_checkpoint_thread_pool; CompactionPermitLimiter _permit_limiter; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 74aa3a8c8f..08e5fecd15 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -438,6 +438,11 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) { } inline bool Tablet::enable_unique_key_merge_on_write() const { +#ifdef BE_TEST + if (_tablet_meta == nullptr) { + return false; + } +#endif return _tablet_meta->enable_unique_key_merge_on_write(); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 5c7397c8ad..177f9f89d0 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -17,6 +17,8 @@ #include "olap/task/engine_publish_version_task.h" +#include + #include #include "olap/data_dir.h" @@ -34,13 +36,38 @@ EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publi _error_tablet_ids(error_tablet_ids), _succ_tablet_ids(succ_tablet_ids) {} +void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { + std::lock_guard lck(_tablet_ids_mutex); + _error_tablet_ids->push_back(tablet_id); +} + +void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) { + std::lock_guard lck(_tablet_ids_mutex); + _succ_tablet_ids->push_back(tablet_id); +} + +void EnginePublishVersionTask::wait() { + std::unique_lock lock(_tablet_finish_sleep_mutex); + _tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10)); +} + +void EnginePublishVersionTask::notify() { + std::unique_lock lock(_tablet_finish_sleep_mutex); + _tablet_finish_sleep_cond.notify_one(); +} + Status EnginePublishVersionTask::finish() { Status res = Status::OK(); int64_t transaction_id = _publish_version_req.transaction_id; VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; // each partition + bool meet_version_not_continuous = false; + std::atomic total_task_num(0); for (auto& par_ver_info : _publish_version_req.partition_version_infos) { + if (meet_version_not_continuous) { + break; + } int64_t partition_id = par_ver_info.partition_id; // get all partition related tablets and check whether the tablet have the related version std::set partition_related_tablet_infos; @@ -60,7 +87,9 @@ Status EnginePublishVersionTask::finish() { // each tablet for (auto& tablet_rs : tablet_related_rs) { - Status publish_status = Status::OK(); + if (meet_version_not_continuous) { + break; + } TabletInfo tablet_info = tablet_rs.first; RowsetSharedPtr rowset = tablet_rs.second; VLOG_CRITICAL << "begin to publish version on tablet. " @@ -86,39 +115,45 @@ Status EnginePublishVersionTask::finish() { res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST); continue; } - - publish_status = StorageEngine::instance()->txn_manager()->publish_txn( - partition_id, tablet, transaction_id, version); - if (publish_status != Status::OK()) { - LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id() - << ", tablet_id=" << tablet_info.tablet_id - << ", txn_id=" << transaction_id; - _error_tablet_ids->push_back(tablet_info.tablet_id); - res = publish_status; + Version max_version = tablet->max_version(); + // in uniq key model with merge-on-write, we should see all + // previous version when update delete bitmap, so add a check + // here and wait pre version publish or lock timeout + if (tablet->keys_type() == KeysType::UNIQUE_KEYS && + tablet->enable_unique_key_merge_on_write() && + version.first != max_version.second + 1) { + LOG(INFO) << "uniq key with merge-on-write version not continuous, current max " + "version=" + << max_version.second << ", publish_version=" << version.first + << " tablet_id=" << tablet->tablet_id(); + meet_version_not_continuous = true; + res = Status::OLAPInternalError(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS); continue; } - - // add visible rowset to tablet - publish_status = tablet->add_inc_rowset(rowset); - if (publish_status != Status::OK() && - publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { - LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" - << rowset->rowset_id() << ", tablet_id=" << tablet_info.tablet_id - << ", txn_id=" << transaction_id << ", res=" << publish_status; - _error_tablet_ids->push_back(tablet_info.tablet_id); - res = publish_status; - continue; - } - if (_succ_tablet_ids != nullptr) { - _succ_tablet_ids->push_back(tablet_info.tablet_id); - } - partition_related_tablet_infos.erase(tablet_info); - VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name() - << ", transaction_id=" << transaction_id << ", version=" << version.first - << ", res=" << publish_status; + total_task_num.fetch_add(1); + auto tablet_publish_txn_ptr = std::make_shared( + this, tablet, rowset, partition_id, transaction_id, version, tablet_info, + &total_task_num); + auto submit_st = + StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( + [=]() { tablet_publish_txn_ptr->handle(); }); + CHECK(submit_st.ok()); } + } + // wait for all publish txn finished + while (total_task_num.load() != 0) { + wait(); + } - // check if the related tablet remained all have the version + // check if the related tablet remained all have the version + for (auto& par_ver_info : _publish_version_req.partition_version_infos) { + int64_t partition_id = par_ver_info.partition_id; + // get all partition related tablets and check whether the tablet have the related version + std::set partition_related_tablet_infos; + StorageEngine::instance()->tablet_manager()->get_partition_related_tablets( + partition_id, &partition_related_tablet_infos); + + Version version(par_ver_info.version, par_ver_info.version); for (auto& tablet_info : partition_related_tablet_infos) { // has to use strict mode to check if check all tablets if (!_publish_version_req.strict_mode) { @@ -127,11 +162,11 @@ Status EnginePublishVersionTask::finish() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id); if (tablet == nullptr) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); } else { // check if the version exist, if not exist, then set publish failed if (!tablet->check_version_exist(version)) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); } } } @@ -143,4 +178,51 @@ Status EnginePublishVersionTask::finish() { return res; } +TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task, + TabletSharedPtr tablet, RowsetSharedPtr rowset, + int64_t partition_id, int64_t transaction_id, + Version version, const TabletInfo& tablet_info, + std::atomic* total_task_num) + : _engine_publish_version_task(engine_task), + _tablet(tablet), + _rowset(rowset), + _partition_id(partition_id), + _transaction_id(transaction_id), + _version(version), + _tablet_info(tablet_info), + _total_task_num(total_task_num) {} + +void TabletPublishTxnTask::handle() { + Defer defer {[&] { + if (_total_task_num->fetch_sub(1) == 1) { + _engine_publish_version_task->notify(); + } + }}; + auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn( + _partition_id, _tablet, _transaction_id, _version); + if (publish_status != Status::OK()) { + LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id() + << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id; + _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); + return; + } + + // add visible rowset to tablet + publish_status = _tablet->add_inc_rowset(_rowset); + if (publish_status != Status::OK() && + publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { + LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id() + << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id + << ", res=" << publish_status; + _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); + return; + } + _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id); + VLOG_NOTICE << "publish version successfully on tablet. tablet=" << _tablet->full_name() + << ", transaction_id=" << _transaction_id << ", version=" << _version.first + << ", res=" << publish_status; + + return; +} + } // namespace doris diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 4086f466d3..959584d116 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -24,6 +24,30 @@ namespace doris { +class EnginePublishVersionTask; +class TabletPublishTxnTask { +public: + TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, + RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, + Version version, const TabletInfo& tablet_info, + std::atomic* total_task_num); + ~TabletPublishTxnTask() {} + + void handle(); + +private: + EnginePublishVersionTask* _engine_publish_version_task; + + TabletSharedPtr _tablet; + RowsetSharedPtr _rowset; + int64_t _partition_id; + int64_t _transaction_id; + Version _version; + TabletInfo _tablet_info; + + std::atomic* _total_task_num; +}; + class EnginePublishVersionTask : public EngineTask { public: EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, @@ -33,10 +57,20 @@ public: virtual Status finish() override; + void add_error_tablet_id(int64_t tablet_id); + void add_succ_tablet_id(int64_t tablet_id); + + void notify(); + void wait(); + private: const TPublishVersionRequest& _publish_version_req; + std::mutex _tablet_ids_mutex; vector* _error_tablet_ids; vector* _succ_tablet_ids; + + std::mutex _tablet_finish_sleep_mutex; + std::condition_variable _tablet_finish_sleep_cond; }; } // namespace doris diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 0cf09c0876..68d3c57a98 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -44,6 +44,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/utils.h" +#include "rowset/beta_rowset.h" #include "util/doris_metrics.h" #include "util/pretty_printer.h" #include "util/time.h" @@ -308,8 +309,109 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, _clear_txn_partition_map_unlocked(transaction_id, partition_id); } } + } + auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); +#ifdef BE_TEST + if (tablet == nullptr) { return Status::OK(); } +#endif + // Check if have to build extra delete bitmap for table of UNIQUE_KEY model + if (!tablet->enable_unique_key_merge_on_write() || + tablet->tablet_meta()->preferred_rowset_type() != RowsetTypePB::BETA_ROWSET || + rowset_ptr->keys_type() != KeysType::UNIQUE_KEYS) { + return Status::OK(); + } + CHECK(version.first == version.second) << "impossible: " << version; + + // For each key in current set, check if it overwrites any previously + // written keys + OlapStopWatch watch; + std::vector segments; + std::vector pre_segments; + auto beta_rowset = reinterpret_cast(rowset_ptr.get()); + Status st = beta_rowset->load_segments(&segments); + if (!st.ok()) return st; + // lock tablet meta to modify delete bitmap + std::lock_guard meta_wrlock(tablet->get_header_lock()); + for (auto& seg : segments) { + seg->load_index(); // We need index blocks to iterate + auto pk_idx = seg->get_primary_key_index(); + int cnt = 0; + int total = pk_idx->num_rows(); + int32_t remaining = total; + bool exact_match = false; + std::string last_key; + int batch_size = 1024; + MemPool pool; + while (remaining > 0) { + std::unique_ptr iter; + RETURN_IF_ERROR(pk_idx->new_iterator(&iter)); + + size_t num_to_read = std::min(batch_size, remaining); + std::unique_ptr cvb; + RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(), + nullptr, &cvb)); + ColumnBlock block(cvb.get(), &pool); + ColumnBlockView column_block_view(&block); + Slice last_key_slice(last_key); + RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match)); + + size_t num_read = num_to_read; + RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view)); + DCHECK(num_to_read == num_read); + last_key = (reinterpret_cast(cvb->cell_ptr(num_read - 1)))->to_string(); + + // exclude last_key, last_key will be read in next batch. + if (num_read == batch_size && num_read != remaining) { + num_read -= 1; + } + for (size_t i = 0; i < num_read; i++) { + const Slice* key = reinterpret_cast(cvb->cell_ptr(i)); + // first check if exist in pre segment + bool find = _check_pk_in_pre_segments(pre_segments, *key, tablet, version); + if (find) { + cnt++; + continue; + } + RowLocation loc; + st = tablet->lookup_row_key(*key, &loc, version.first - 1); + CHECK(st.ok() || st.is_not_found()); + if (st.is_not_found()) continue; + ++cnt; + // TODO: we can just set a bitmap onece we are done while iteration + tablet->tablet_meta()->delete_bitmap().add( + {loc.rowset_id, loc.segment_id, version.first}, loc.row_id); + } + remaining -= num_read; + } + + LOG(INFO) << "construct delete bitmap tablet: " << tablet->tablet_id() + << " rowset: " << beta_rowset->rowset_id() << " segment: " << seg->id() + << " version: " << version << " delete: " << cnt << "/" << total; + pre_segments.emplace_back(seg); + } + tablet->save_meta(); + LOG(INFO) << "finished to update delete bitmap, tablet: " << tablet->tablet_id() + << " version: " << version << ", elapse(us): " << watch.get_elapse_time_us(); + return Status::OK(); +} + +bool TxnManager::_check_pk_in_pre_segments( + const std::vector& pre_segments, const Slice& key, + TabletSharedPtr tablet, const Version& version) { + for (auto it = pre_segments.rbegin(); it != pre_segments.rend(); ++it) { + RowLocation loc; + auto st = (*it)->lookup_row_key(key, &loc); + CHECK(st.ok() || st.is_not_found()); + if (st.is_not_found()) { + continue; + } + tablet->tablet_meta()->delete_bitmap().add({loc.rowset_id, loc.segment_id, version.first}, + loc.row_id); + return true; + } + return false; } // txn could be rollbacked if it does not have related rowset diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 2dd6dfb753..a755f5dc79 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -42,6 +42,7 @@ #include "olap/options.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" +#include "olap/rowset/segment_v2/segment.h" #include "olap/tablet.h" #include "util/time.h" @@ -172,6 +173,10 @@ private: void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + bool _check_pk_in_pre_segments(const std::vector& pre_segments, + const Slice& key, TabletSharedPtr tablet, + const Version& version); + private: const int32_t _txn_map_shard_size; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 927b257d43..b5b3b4d988 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -207,8 +207,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); EXPECT_EQ(Status::OK(), res); res = tablet->add_inc_rowset(rowset); EXPECT_EQ(Status::OK(), res); diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/tablet_clone_test.cpp index 135ed47c77..51124f87cf 100644 --- a/be/test/olap/tablet_clone_test.cpp +++ b/be/test/olap/tablet_clone_test.cpp @@ -201,8 +201,8 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) { RowsetSharedPtr rowset = tablet_rs.second; rowset->rowset_meta()->set_resource_id(kResourceId); st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index a7308062dd..f20eccd376 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -200,8 +200,8 @@ TEST_F(TabletCooldownTest, normal) { for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st);