diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1380331e7a..40681ce17d 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -711,6 +711,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { res = _env->storage_engine()->execute_task(&engine_task); if (res.ok()) { break; + } else if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) { + // version not continuous, put to queue and wait pre version publish + // task execute + std::unique_lock worker_thread_lock(_worker_thread_lock); + _tasks.push_back(agent_task_req); + _worker_thread_condition_variable.notify_one(); + break; } else { LOG(WARNING) << "publish version error, retry. [transaction_id=" << publish_version_req.transaction_id @@ -719,6 +726,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { std::this_thread::sleep_for(std::chrono::seconds(1)); } } + if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) { + continue; + } TFinishTaskRequest finish_task_request; if (!res) { diff --git a/be/src/common/config.h b/be/src/common/config.h index ee8698bcb5..9891df2f18 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -81,6 +81,8 @@ CONF_Int32(push_worker_count_normal_priority, "3"); CONF_Int32(push_worker_count_high_priority, "3"); // the count of thread to publish version CONF_Int32(publish_version_worker_count, "8"); +// the count of tablet thread to publish version +CONF_Int32(tablet_publish_txn_max_thread, "32"); // the count of thread to clear transaction task CONF_Int32(clear_transaction_task_worker_count, "1"); // the count of thread to delete diff --git a/be/src/common/status.h b/be/src/common/status.h index 830d417a7c..f441ccd759 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -231,7 +231,8 @@ namespace doris { M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true) \ M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \ M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \ - M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) + M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \ + M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false) enum ErrorCode { #define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE, diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index fbb1811705..28e9241985 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -137,6 +137,12 @@ Status StorageEngine::start_bg_threads() { &_cooldown_tasks_producer_thread)); LOG(INFO) << "cooldown tasks producer thread started"; + // add tablet publish version thread pool + ThreadPoolBuilder("TabletPublishTxnThreadPool") + .set_min_threads(config::tablet_publish_txn_max_thread) + .set_max_threads(config::tablet_publish_txn_max_thread) + .build(&_tablet_publish_txn_thread_pool); + LOG(INFO) << "all storage engine's background threads are started."; return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 09c29979bf..6b84cd9566 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -78,7 +78,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea } } - RETURN_IF_ERROR(_load_index()); + RETURN_IF_ERROR(load_index()); iter->reset(new SegmentIterator(this->shared_from_this(), schema)); iter->get()->init(read_options); return Status::OK(); @@ -134,7 +134,7 @@ Status Segment::_parse_footer() { return Status::OK(); } -Status Segment::_load_index() { +Status Segment::load_index() { return _load_index_once.call([this] { // read and parse short key index page PageReadOptions opts; @@ -225,7 +225,7 @@ Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column, } Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) { - RETURN_IF_ERROR(_load_index()); + RETURN_IF_ERROR(load_index()); DCHECK(_pk_index_reader != nullptr); if (!_pk_index_reader->check_present(key)) { return Status::NotFound("Can't find key in the segment"); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index e8cb0c3081..5323ace873 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -92,6 +92,8 @@ public: // only used by UT const SegmentFooterPB& footer() const { return _footer; } + Status load_index(); + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, const TabletSchema* tablet_schema); @@ -99,9 +101,6 @@ private: Status _open(); Status _parse_footer(); Status _create_column_readers(); - // Load and decode short key index. - // May be called multiple times, subsequent calls will no op. - Status _load_index(); private: friend class SegmentIterator; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 1b6b28b4c4..df018e27d3 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -192,6 +192,10 @@ public: Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); Status submit_quick_compaction_task(TabletSharedPtr tablet); + std::unique_ptr& tablet_publish_txn_thread_pool() { + return _tablet_publish_txn_thread_pool; + } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -382,6 +386,8 @@ private: std::unique_ptr _base_compaction_thread_pool; std::unique_ptr _cumu_compaction_thread_pool; + std::unique_ptr _tablet_publish_txn_thread_pool; + std::unique_ptr _tablet_meta_checkpoint_thread_pool; CompactionPermitLimiter _permit_limiter; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 74aa3a8c8f..08e5fecd15 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -438,6 +438,11 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) { } inline bool Tablet::enable_unique_key_merge_on_write() const { +#ifdef BE_TEST + if (_tablet_meta == nullptr) { + return false; + } +#endif return _tablet_meta->enable_unique_key_merge_on_write(); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 5c7397c8ad..177f9f89d0 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -17,6 +17,8 @@ #include "olap/task/engine_publish_version_task.h" +#include + #include #include "olap/data_dir.h" @@ -34,13 +36,38 @@ EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publi _error_tablet_ids(error_tablet_ids), _succ_tablet_ids(succ_tablet_ids) {} +void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { + std::lock_guard lck(_tablet_ids_mutex); + _error_tablet_ids->push_back(tablet_id); +} + +void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) { + std::lock_guard lck(_tablet_ids_mutex); + _succ_tablet_ids->push_back(tablet_id); +} + +void EnginePublishVersionTask::wait() { + std::unique_lock lock(_tablet_finish_sleep_mutex); + _tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10)); +} + +void EnginePublishVersionTask::notify() { + std::unique_lock lock(_tablet_finish_sleep_mutex); + _tablet_finish_sleep_cond.notify_one(); +} + Status EnginePublishVersionTask::finish() { Status res = Status::OK(); int64_t transaction_id = _publish_version_req.transaction_id; VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; // each partition + bool meet_version_not_continuous = false; + std::atomic total_task_num(0); for (auto& par_ver_info : _publish_version_req.partition_version_infos) { + if (meet_version_not_continuous) { + break; + } int64_t partition_id = par_ver_info.partition_id; // get all partition related tablets and check whether the tablet have the related version std::set partition_related_tablet_infos; @@ -60,7 +87,9 @@ Status EnginePublishVersionTask::finish() { // each tablet for (auto& tablet_rs : tablet_related_rs) { - Status publish_status = Status::OK(); + if (meet_version_not_continuous) { + break; + } TabletInfo tablet_info = tablet_rs.first; RowsetSharedPtr rowset = tablet_rs.second; VLOG_CRITICAL << "begin to publish version on tablet. " @@ -86,39 +115,45 @@ Status EnginePublishVersionTask::finish() { res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST); continue; } - - publish_status = StorageEngine::instance()->txn_manager()->publish_txn( - partition_id, tablet, transaction_id, version); - if (publish_status != Status::OK()) { - LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id() - << ", tablet_id=" << tablet_info.tablet_id - << ", txn_id=" << transaction_id; - _error_tablet_ids->push_back(tablet_info.tablet_id); - res = publish_status; + Version max_version = tablet->max_version(); + // in uniq key model with merge-on-write, we should see all + // previous version when update delete bitmap, so add a check + // here and wait pre version publish or lock timeout + if (tablet->keys_type() == KeysType::UNIQUE_KEYS && + tablet->enable_unique_key_merge_on_write() && + version.first != max_version.second + 1) { + LOG(INFO) << "uniq key with merge-on-write version not continuous, current max " + "version=" + << max_version.second << ", publish_version=" << version.first + << " tablet_id=" << tablet->tablet_id(); + meet_version_not_continuous = true; + res = Status::OLAPInternalError(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS); continue; } - - // add visible rowset to tablet - publish_status = tablet->add_inc_rowset(rowset); - if (publish_status != Status::OK() && - publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { - LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" - << rowset->rowset_id() << ", tablet_id=" << tablet_info.tablet_id - << ", txn_id=" << transaction_id << ", res=" << publish_status; - _error_tablet_ids->push_back(tablet_info.tablet_id); - res = publish_status; - continue; - } - if (_succ_tablet_ids != nullptr) { - _succ_tablet_ids->push_back(tablet_info.tablet_id); - } - partition_related_tablet_infos.erase(tablet_info); - VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name() - << ", transaction_id=" << transaction_id << ", version=" << version.first - << ", res=" << publish_status; + total_task_num.fetch_add(1); + auto tablet_publish_txn_ptr = std::make_shared( + this, tablet, rowset, partition_id, transaction_id, version, tablet_info, + &total_task_num); + auto submit_st = + StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func( + [=]() { tablet_publish_txn_ptr->handle(); }); + CHECK(submit_st.ok()); } + } + // wait for all publish txn finished + while (total_task_num.load() != 0) { + wait(); + } - // check if the related tablet remained all have the version + // check if the related tablet remained all have the version + for (auto& par_ver_info : _publish_version_req.partition_version_infos) { + int64_t partition_id = par_ver_info.partition_id; + // get all partition related tablets and check whether the tablet have the related version + std::set partition_related_tablet_infos; + StorageEngine::instance()->tablet_manager()->get_partition_related_tablets( + partition_id, &partition_related_tablet_infos); + + Version version(par_ver_info.version, par_ver_info.version); for (auto& tablet_info : partition_related_tablet_infos) { // has to use strict mode to check if check all tablets if (!_publish_version_req.strict_mode) { @@ -127,11 +162,11 @@ Status EnginePublishVersionTask::finish() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id); if (tablet == nullptr) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); } else { // check if the version exist, if not exist, then set publish failed if (!tablet->check_version_exist(version)) { - _error_tablet_ids->push_back(tablet_info.tablet_id); + add_error_tablet_id(tablet_info.tablet_id); } } } @@ -143,4 +178,51 @@ Status EnginePublishVersionTask::finish() { return res; } +TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task, + TabletSharedPtr tablet, RowsetSharedPtr rowset, + int64_t partition_id, int64_t transaction_id, + Version version, const TabletInfo& tablet_info, + std::atomic* total_task_num) + : _engine_publish_version_task(engine_task), + _tablet(tablet), + _rowset(rowset), + _partition_id(partition_id), + _transaction_id(transaction_id), + _version(version), + _tablet_info(tablet_info), + _total_task_num(total_task_num) {} + +void TabletPublishTxnTask::handle() { + Defer defer {[&] { + if (_total_task_num->fetch_sub(1) == 1) { + _engine_publish_version_task->notify(); + } + }}; + auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn( + _partition_id, _tablet, _transaction_id, _version); + if (publish_status != Status::OK()) { + LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id() + << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id; + _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); + return; + } + + // add visible rowset to tablet + publish_status = _tablet->add_inc_rowset(_rowset); + if (publish_status != Status::OK() && + publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { + LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id() + << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id + << ", res=" << publish_status; + _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); + return; + } + _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id); + VLOG_NOTICE << "publish version successfully on tablet. tablet=" << _tablet->full_name() + << ", transaction_id=" << _transaction_id << ", version=" << _version.first + << ", res=" << publish_status; + + return; +} + } // namespace doris diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 4086f466d3..959584d116 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -24,6 +24,30 @@ namespace doris { +class EnginePublishVersionTask; +class TabletPublishTxnTask { +public: + TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, + RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, + Version version, const TabletInfo& tablet_info, + std::atomic* total_task_num); + ~TabletPublishTxnTask() {} + + void handle(); + +private: + EnginePublishVersionTask* _engine_publish_version_task; + + TabletSharedPtr _tablet; + RowsetSharedPtr _rowset; + int64_t _partition_id; + int64_t _transaction_id; + Version _version; + TabletInfo _tablet_info; + + std::atomic* _total_task_num; +}; + class EnginePublishVersionTask : public EngineTask { public: EnginePublishVersionTask(TPublishVersionRequest& publish_version_req, @@ -33,10 +57,20 @@ public: virtual Status finish() override; + void add_error_tablet_id(int64_t tablet_id); + void add_succ_tablet_id(int64_t tablet_id); + + void notify(); + void wait(); + private: const TPublishVersionRequest& _publish_version_req; + std::mutex _tablet_ids_mutex; vector* _error_tablet_ids; vector* _succ_tablet_ids; + + std::mutex _tablet_finish_sleep_mutex; + std::condition_variable _tablet_finish_sleep_cond; }; } // namespace doris diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 0cf09c0876..68d3c57a98 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -44,6 +44,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/utils.h" +#include "rowset/beta_rowset.h" #include "util/doris_metrics.h" #include "util/pretty_printer.h" #include "util/time.h" @@ -308,8 +309,109 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, _clear_txn_partition_map_unlocked(transaction_id, partition_id); } } + } + auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); +#ifdef BE_TEST + if (tablet == nullptr) { return Status::OK(); } +#endif + // Check if have to build extra delete bitmap for table of UNIQUE_KEY model + if (!tablet->enable_unique_key_merge_on_write() || + tablet->tablet_meta()->preferred_rowset_type() != RowsetTypePB::BETA_ROWSET || + rowset_ptr->keys_type() != KeysType::UNIQUE_KEYS) { + return Status::OK(); + } + CHECK(version.first == version.second) << "impossible: " << version; + + // For each key in current set, check if it overwrites any previously + // written keys + OlapStopWatch watch; + std::vector segments; + std::vector pre_segments; + auto beta_rowset = reinterpret_cast(rowset_ptr.get()); + Status st = beta_rowset->load_segments(&segments); + if (!st.ok()) return st; + // lock tablet meta to modify delete bitmap + std::lock_guard meta_wrlock(tablet->get_header_lock()); + for (auto& seg : segments) { + seg->load_index(); // We need index blocks to iterate + auto pk_idx = seg->get_primary_key_index(); + int cnt = 0; + int total = pk_idx->num_rows(); + int32_t remaining = total; + bool exact_match = false; + std::string last_key; + int batch_size = 1024; + MemPool pool; + while (remaining > 0) { + std::unique_ptr iter; + RETURN_IF_ERROR(pk_idx->new_iterator(&iter)); + + size_t num_to_read = std::min(batch_size, remaining); + std::unique_ptr cvb; + RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(), + nullptr, &cvb)); + ColumnBlock block(cvb.get(), &pool); + ColumnBlockView column_block_view(&block); + Slice last_key_slice(last_key); + RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match)); + + size_t num_read = num_to_read; + RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view)); + DCHECK(num_to_read == num_read); + last_key = (reinterpret_cast(cvb->cell_ptr(num_read - 1)))->to_string(); + + // exclude last_key, last_key will be read in next batch. + if (num_read == batch_size && num_read != remaining) { + num_read -= 1; + } + for (size_t i = 0; i < num_read; i++) { + const Slice* key = reinterpret_cast(cvb->cell_ptr(i)); + // first check if exist in pre segment + bool find = _check_pk_in_pre_segments(pre_segments, *key, tablet, version); + if (find) { + cnt++; + continue; + } + RowLocation loc; + st = tablet->lookup_row_key(*key, &loc, version.first - 1); + CHECK(st.ok() || st.is_not_found()); + if (st.is_not_found()) continue; + ++cnt; + // TODO: we can just set a bitmap onece we are done while iteration + tablet->tablet_meta()->delete_bitmap().add( + {loc.rowset_id, loc.segment_id, version.first}, loc.row_id); + } + remaining -= num_read; + } + + LOG(INFO) << "construct delete bitmap tablet: " << tablet->tablet_id() + << " rowset: " << beta_rowset->rowset_id() << " segment: " << seg->id() + << " version: " << version << " delete: " << cnt << "/" << total; + pre_segments.emplace_back(seg); + } + tablet->save_meta(); + LOG(INFO) << "finished to update delete bitmap, tablet: " << tablet->tablet_id() + << " version: " << version << ", elapse(us): " << watch.get_elapse_time_us(); + return Status::OK(); +} + +bool TxnManager::_check_pk_in_pre_segments( + const std::vector& pre_segments, const Slice& key, + TabletSharedPtr tablet, const Version& version) { + for (auto it = pre_segments.rbegin(); it != pre_segments.rend(); ++it) { + RowLocation loc; + auto st = (*it)->lookup_row_key(key, &loc); + CHECK(st.ok() || st.is_not_found()); + if (st.is_not_found()) { + continue; + } + tablet->tablet_meta()->delete_bitmap().add({loc.rowset_id, loc.segment_id, version.first}, + loc.row_id); + return true; + } + return false; } // txn could be rollbacked if it does not have related rowset diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 2dd6dfb753..a755f5dc79 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -42,6 +42,7 @@ #include "olap/options.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" +#include "olap/rowset/segment_v2/segment.h" #include "olap/tablet.h" #include "util/time.h" @@ -172,6 +173,10 @@ private: void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + bool _check_pk_in_pre_segments(const std::vector& pre_segments, + const Slice& key, TabletSharedPtr tablet, + const Version& version); + private: const int32_t _txn_map_shard_size; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 927b257d43..b5b3b4d988 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -207,8 +207,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); EXPECT_EQ(Status::OK(), res); res = tablet->add_inc_rowset(rowset); EXPECT_EQ(Status::OK(), res); diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/tablet_clone_test.cpp index 135ed47c77..51124f87cf 100644 --- a/be/test/olap/tablet_clone_test.cpp +++ b/be/test/olap/tablet_clone_test.cpp @@ -201,8 +201,8 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) { RowsetSharedPtr rowset = tablet_rs.second; rowset->rowset_meta()->set_resource_id(kResourceId); st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index a7308062dd..f20eccd376 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -200,8 +200,8 @@ TEST_F(TabletCooldownTest, normal) { for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet->tablet_id(), tablet->schema_hash(), + tablet->tablet_uid(), version); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st);