diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index fdf2ed36ec..bdeb7ab1e7 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -256,17 +256,23 @@ Status Compaction::construct_input_rowset_readers() { Status Compaction::modify_rowsets() { std::vector output_rowsets; output_rowsets.push_back(_output_rowset); - std::lock_guard wrlock(_tablet->get_header_lock()); + { + std::lock_guard wrlock_(_tablet->get_rowset_update_lock()); + std::lock_guard wrlock(_tablet->get_header_lock()); - // update dst rowset delete bitmap - if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write()) { - _tablet->tablet_meta()->update_delete_bitmap(_input_rowsets, _output_rs_writer->version(), - _rowid_conversion); + // update dst rowset delete bitmap + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { + _tablet->tablet_meta()->update_delete_bitmap( + _input_rowsets, _output_rs_writer->version(), _rowid_conversion); + } + + RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + } + { + std::shared_lock rlock(_tablet->get_header_lock()); + _tablet->save_meta(); } - - RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); - _tablet->save_meta(); return Status::OK(); } diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 7e34771c94..0f2e776919 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -282,7 +282,7 @@ Status DeltaWriter::wait_flush() { void DeltaWriter::_reset_mem_table() { if (_tablet->enable_unique_key_merge_on_write()) { - _delete_bitmap.reset(new DeleteBitmap(-1)); + _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id())); } _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), _delete_bitmap, diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 95b9277be3..629c9eb863 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -400,20 +400,21 @@ bool MemTable::need_to_agg() { Status MemTable::_generate_delete_bitmap() { // generate delete bitmap, build a tmp rowset and load recent segment - if (_tablet->enable_unique_key_merge_on_write()) { - auto rowset = _rowset_writer->build_tmp(); - auto beta_rowset = reinterpret_cast(rowset.get()); - std::vector segments; - segment_v2::SegmentSharedPtr segment; - if (beta_rowset->num_segments() == 0) { - return Status::OK(); - } - RETURN_IF_ERROR(beta_rowset->load_segment(beta_rowset->num_segments() - 1, &segment)); - segments.push_back(segment); - std::lock_guard meta_wrlock(_tablet->get_header_lock()); - RETURN_IF_ERROR(_tablet->calc_delete_bitmap(beta_rowset->rowset_id(), segments, - &_rowset_ids, _delete_bitmap)); + if (!_tablet->enable_unique_key_merge_on_write()) { + return Status::OK(); } + auto rowset = _rowset_writer->build_tmp(); + auto beta_rowset = reinterpret_cast(rowset.get()); + std::vector segments; + segment_v2::SegmentSharedPtr segment; + if (beta_rowset->num_segments() == 0) { + return Status::OK(); + } + RETURN_IF_ERROR(beta_rowset->load_segment(beta_rowset->num_segments() - 1, &segment)); + segments.push_back(segment); + std::shared_lock meta_rlock(_tablet->get_header_lock()); + RETURN_IF_ERROR(_tablet->calc_delete_bitmap(beta_rowset->rowset_id(), segments, &_rowset_ids, + _delete_bitmap)); return Status::OK(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2a99ea8a57..d3971937b3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2064,7 +2064,8 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapP std::vector segments; _load_rowset_segments(rowset, &segments); - std::lock_guard meta_wrlock(_meta_lock); + std::lock_guard rwlock(_rowset_update_lock); + std::shared_lock meta_rlock(_meta_lock); cur_rowset_ids = all_rs_id(); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { @@ -2079,7 +2080,9 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapP delete_bitmap, true)); } - // update version + // update version without write lock, compaction and publish_txn + // will update delete bitmap, handle compaction with _delete_bitmap_lock + // and publish_txn runs sequencial so no need to lock here for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { int ret = _tablet_meta->delete_bitmap().set( diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index c0c8cab7d4..51cf455c58 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -165,6 +165,7 @@ public: // meta lock std::shared_mutex& get_header_lock() { return _meta_lock; } + std::mutex& get_rowset_update_lock() { return _rowset_update_lock; } std::mutex& get_push_lock() { return _ingest_lock; } std::mutex& get_base_compaction_lock() { return _base_compaction_lock; } std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } @@ -392,6 +393,13 @@ private: // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to // explain how these two locks work together. mutable std::shared_mutex _meta_lock; + + // In unique key table with MoW, we should guarantee that only one + // writer can update rowset and delete bitmap at the same time. + // We use a separate lock rather than _meta_lock, to avoid blocking read queries + // during publish_txn, which might take hundreds of milliseconds + mutable std::mutex _rowset_update_lock; + // After version 0.13, all newly created rowsets are saved in _rs_version_map. // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map; std::unordered_map _rs_version_map; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 786fe2674e..1689c0a356 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -536,11 +536,17 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_storage_policy(_storage_policy); tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write); - { - std::shared_lock l(delete_bitmap().lock); + if (_enable_unique_key_merge_on_write) { + std::set rs_ids; + for (const auto& rowset : _rs_metas) { + rs_ids.insert(rowset->rowset_id()); + } DeleteBitmapPB* delete_bitmap_pb = tablet_meta_pb->mutable_delete_bitmap(); - for (auto& [id, bitmap] : delete_bitmap().delete_bitmap) { + for (auto& [id, bitmap] : delete_bitmap().snapshot().delete_bitmap) { auto& [rowset_id, segment_id, ver] = id; + if (rs_ids.count(rowset_id) == 0) { + continue; + } delete_bitmap_pb->add_rowset_ids(rowset_id.to_string()); delete_bitmap_pb->add_segment_ids(segment_id); delete_bitmap_pb->add_versions(ver); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 1bf4da2cbd..9cea34b248 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -404,7 +404,7 @@ public: static std::once_flag once; std::call_once(once, [size_in_bytes] { auto tmp = new ShardedLRUCache("DeleteBitmap AggCache", size_in_bytes, - LRUCacheType::SIZE, 2048); + LRUCacheType::SIZE, 256); AggCache::s_repr.store(tmp, std::memory_order_release); }); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index be4a2e6abc..4c05b8c59d 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -126,11 +126,11 @@ Status EnginePublishVersionTask::finish() { max_version = tablet->max_version(); } if (version.first != max_version.second + 1) { - LOG(INFO) << "uniq key with merge-on-write version not continuous, current " - "max " - "version=" - << max_version.second << ", publish_version=" << version.first - << " tablet_id=" << tablet->tablet_id(); + VLOG_NOTICE << "uniq key with merge-on-write version not continuous, current " + "max " + "version=" + << max_version.second << ", publish_version=" << version.first + << " tablet_id=" << tablet->tablet_id(); meet_version_not_continuous = true; res = Status::OLAPInternalError(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS); continue; @@ -180,7 +180,7 @@ Status EnginePublishVersionTask::finish() { LOG(INFO) << "finish to publish version on transaction." << "transaction_id=" << transaction_id - << ", error_tablet_size=" << _error_tablet_ids->size(); + << ", error_tablet_size=" << _error_tablet_ids->size() << ", res=" << res.to_string(); return res; } diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index efb4185f46..3ef4f40084 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -309,6 +309,20 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // save meta need access disk, it maybe very slow, so that it is not in global txn lock // it is under a single txn lock if (rowset_ptr != nullptr) { + // update delete_bitmap + { + if (load_info != nullptr && load_info->unique_key_merge_on_write) { + auto tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + return Status::OK(); + } + RETURN_IF_ERROR(tablet->update_delete_bitmap( + rowset_ptr, load_info->delete_bitmap, load_info->rowset_ids)); + std::shared_lock rlock(tablet->get_header_lock()); + tablet->save_meta(); + } + } // TODO(ygl): rowset is already set version here, memory is changed, if save failed // it maybe a fatal error rowset_ptr->make_visible(version); @@ -325,21 +339,6 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, return Status::OLAPInternalError(OLAP_ERR_TRANSACTION_NOT_EXIST); } } - // update delete_bitmap - { - auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); -#ifdef BE_TEST - if (tablet == nullptr) { - return Status::OK(); - } -#endif - if (load_info != nullptr && load_info->unique_key_merge_on_write) { - RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset_ptr, load_info->delete_bitmap, - load_info->rowset_ids)); - std::lock_guard wrlock(tablet->get_header_lock()); - tablet->save_meta(); - } - } { std::unique_lock txn_lock(_get_txn_lock(transaction_id)); std::lock_guard wrlock(_get_txn_map_lock(transaction_id)); diff --git a/be/test/olap/test_data/header_without_inc_rs.txt b/be/test/olap/test_data/header_without_inc_rs.txt index fc2a5cb1e6..44c191d3fc 100644 --- a/be/test/olap/test_data/header_without_inc_rs.txt +++ b/be/test/olap/test_data/header_without_inc_rs.txt @@ -147,6 +147,5 @@ "tablet_type": "TABLET_TYPE_DISK", "replica_id": 0, "storage_policy": "", - "delete_bitmap": {}, "enable_unique_key_merge_on_write": false }