From 2084d8bdf36e9ec6bafd060c3d615b456937db24 Mon Sep 17 00:00:00 2001 From: Compilation Success <106100979+compiletheworld@users.noreply.github.com> Date: Tue, 12 Jul 2022 16:34:42 +0800 Subject: [PATCH] [feature-wip](unique-key-merge-on-write) Add delete bitmap for DSIP-018 (#10548) Add delete bitmap for DSIP-018: Support Merge-On-Write implementation for UNIQUE KEY data model --- be/src/olap/tablet_meta.cpp | 134 +++++++++++++++++- be/src/olap/tablet_meta.h | 113 +++++++++++++++ be/test/olap/tablet_meta_test.cpp | 79 +++++++++++ .../olap/test_data/header_without_inc_rs.txt | 3 +- gensrc/proto/olap_file.proto | 9 ++ 5 files changed, 335 insertions(+), 3 deletions(-) diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index bd3161fd30..3a0490c370 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -48,7 +48,8 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl return Status::OK(); } -TabletMeta::TabletMeta() : _tablet_uid(0, 0), _schema(new TabletSchema) {} +TabletMeta::TabletMeta() + : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap()) {} TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int64_t replica_id, int32_t schema_hash, uint64_t shard_id, @@ -57,7 +58,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id TabletUid tablet_uid, TTabletType::type tabletType, TStorageMedium::type t_storage_medium, const std::string& storage_name, TCompressionType::type compression_type, const std::string& storage_policy) - : _tablet_uid(0, 0), _schema(new TabletSchema) { + : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap()) { TabletMetaPB tablet_meta_pb; tablet_meta_pb.set_table_id(table_id); tablet_meta_pb.set_partition_id(partition_id); @@ -456,6 +457,23 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _remote_storage_name = tablet_meta_pb.remote_storage_name(); _storage_medium = tablet_meta_pb.storage_medium(); _cooldown_resource = tablet_meta_pb.storage_policy(); + + if (tablet_meta_pb.has_delete_bitmap()) { + int rst_ids_size = tablet_meta_pb.delete_bitmap().rowset_ids_size(); + int seg_ids_size = tablet_meta_pb.delete_bitmap().segment_ids_size(); + int versions_size = tablet_meta_pb.delete_bitmap().versions_size(); + int seg_maps_size = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps_size(); + CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size && + seg_maps_size == versions_size); + for (size_t i = 0; i < rst_ids_size; ++i) { + RowsetId rst_id; + rst_id.init(tablet_meta_pb.delete_bitmap().rowset_ids(i)); + auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i); + uint32_t ver = tablet_meta_pb.delete_bitmap().versions(i); + auto bitmap = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data(); + delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap); + } + } } void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { @@ -505,6 +523,20 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_remote_storage_name(_remote_storage_name); tablet_meta_pb->set_storage_medium(_storage_medium); tablet_meta_pb->set_storage_policy(_cooldown_resource); + + { + std::shared_lock l(delete_bitmap().lock); + DeleteBitmapPB* delete_bitmap_pb = tablet_meta_pb->mutable_delete_bitmap(); + for (auto& [id, bitmap] : delete_bitmap().delete_bitmap) { + auto& [rowset_id, segment_id, ver] = id; + delete_bitmap_pb->add_rowset_ids(rowset_id.to_string()); + delete_bitmap_pb->add_segment_ids(segment_id); + delete_bitmap_pb->add_versions(ver); + std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); + bitmap.write(bitmap_data.data()); + *(delete_bitmap_pb->add_segment_delete_bitmaps()) = std::move(bitmap_data); + } + } } uint32_t TabletMeta::mem_size() const { @@ -729,4 +761,102 @@ bool operator!=(const TabletMeta& a, const TabletMeta& b) { return !(a == b); } +DeleteBitmap::DeleteBitmap() {} + +DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) { + delete_bitmap = o.delete_bitmap; // just copy data +} + +DeleteBitmap& DeleteBitmap::operator=(const DeleteBitmap& o) { + delete_bitmap = o.delete_bitmap; // just copy data + return *this; +} + +DeleteBitmap::DeleteBitmap(DeleteBitmap&& o) { + delete_bitmap = std::move(o.delete_bitmap); +} + +DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) { + delete_bitmap = std::move(o.delete_bitmap); + return *this; +} + +DeleteBitmap DeleteBitmap::snapshot() const { + std::shared_lock l(lock); + return DeleteBitmap(*this); +} + +void DeleteBitmap::add(const BitmapKey& bmk, uint32_t row_id) { + std::lock_guard l(lock); + delete_bitmap[bmk].add(row_id); +} + +int DeleteBitmap::remove(const BitmapKey& bmk, uint32_t row_id) { + std::lock_guard l(lock); + auto it = delete_bitmap.find(bmk); + if (it == delete_bitmap.end()) return -1; + it->second.remove(row_id); + return 0; +} + +void DeleteBitmap::remove(const BitmapKey& start, const BitmapKey& end) { + std::lock_guard l(lock); + for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end();) { + auto& [k, _] = *it; + if (k >= end) { + break; + } + it = delete_bitmap.erase(it); + } +} + +bool DeleteBitmap::contains(const BitmapKey& bmk, uint32_t row_id) const { + std::shared_lock l(lock); + auto it = delete_bitmap.find(bmk); + return it != delete_bitmap.end() && it->second.contains(row_id); +} + +int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) { + std::lock_guard l(lock); + auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, segment_delete_bitmap); + return inserted; +} + +int DeleteBitmap::get(const BitmapKey& bmk, roaring::Roaring* segment_delete_bitmap) const { + std::shared_lock l(lock); + auto it = delete_bitmap.find(bmk); + if (it == delete_bitmap.end()) return -1; + *segment_delete_bitmap = it->second; // copy + return 0; +} + +const roaring::Roaring* DeleteBitmap::get(const BitmapKey& bmk) const { + std::shared_lock l(lock); + auto it = delete_bitmap.find(bmk); + if (it == delete_bitmap.end()) return nullptr; + return &(it->second); // get address +} + +void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end, + DeleteBitmap* subset_rowset_map) const { + roaring::Roaring roaring; + DCHECK(start < end); + std::shared_lock l(lock); + for (auto it = delete_bitmap.upper_bound(start); it != delete_bitmap.end(); ++it) { + auto& [k, bm] = *it; + if (k >= end) { + break; + } + subset_rowset_map->set(k, bm); + } +} + +void DeleteBitmap::merge(const DeleteBitmap& other) { + std::lock_guard l(lock); + for (auto& i : other.delete_bitmap) { + auto [j, succ] = this->delete_bitmap.insert(i); + if (!succ) j->second |= i.second; + } +} + } // namespace doris diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index e0010d8f41..6e558d996a 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -67,6 +67,7 @@ class RowsetMeta; class Rowset; class DataDir; class TabletMeta; +class DeleteBitmap; using TabletMetaSharedPtr = std::shared_ptr; // Class encapsulates meta of tablet. @@ -199,6 +200,8 @@ public: _cooldown_resource = std::move(resource); } + DeleteBitmap& delete_bitmap() { return *_delete_bitmap; } + private: Status _save_meta(DataDir* data_dir); void _init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column); @@ -239,9 +242,119 @@ private: // FIXME(cyx): Currently `cooldown_resource` is equivalent to `storage_policy`. io::ResourceId _cooldown_resource; + std::unique_ptr _delete_bitmap; + mutable std::shared_mutex _meta_lock; }; +/** + * Wraps multiple bitmaps for recording rows (row id) that are deleted or + * overwritten. + * + * RowsetId and SegmentId are for locating segment, Version here is a single + * uint32_t means that at which "version" of the load causes the delete or + * overwrite. + * + * The start and end version of a load is the same, it's ok and straightforward + * to use a single uint32_t. + * + * e.g. + * There is a key "key1" in rowset id 1, version [1,1], segment id 1, row id 1. + * A new load also contains "key1", the rowset id 2, version [2,2], segment id 1 + * the delete bitmap will be `{1,1,2} -> 1`, which means the "row id 1" in + * "rowset id 1, segment id 1" is deleted/overitten by some loads at "version 2" + */ +class DeleteBitmap { +public: + mutable std::shared_mutex lock; + using SegmentId = uint32_t; + using Version = uint32_t; + using BitmapKey = std::tuple; + std::map delete_bitmap; // Ordered map + + DeleteBitmap(); + + /** + * Copy c-tor for making delete bitmap snapshot on read path + */ + DeleteBitmap(const DeleteBitmap& r); + DeleteBitmap& operator=(const DeleteBitmap& r); + /** + * Move c-tor for making delete bitmap snapshot on read path + */ + DeleteBitmap(DeleteBitmap&& r); + DeleteBitmap& operator=(DeleteBitmap&& r); + + /** + * Makes a snapshot of delete bimap, read lock will be acquired in this + * process + */ + DeleteBitmap snapshot() const; + + /** + * Marks the specific row deleted + */ + void add(const BitmapKey& bmk, uint32_t row_id); + + /** + * Clears the deletetion mark specific row + * + * @return non-zero if the associated delete bimap does not exist + */ + int remove(const BitmapKey& bmk, uint32_t row_id); + + /** + * Clears bitmaps in range [lower_key, upper_key) + */ + void remove(const BitmapKey& lower_key, const BitmapKey& upper_key); + + /** + * Checks if the given row is marked deleted + * + * @return true if marked deleted + */ + bool contains(const BitmapKey& bmk, uint32_t row_id) const; + + /** + * Sets the bitmap of specific segment, it's may be insertion or replacement + * + * @return 0 if the insertion took place, 1 if the assignment took place + */ + int set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap); + + /** + * Gets a copy of specific delete bmk + * + * @param segment_delete_bitmap output param + * @return non-zero if the associated delete bimap does not exist + */ + int get(const BitmapKey& bmk, roaring::Roaring* segment_delete_bitmap) const; + + /** + * Gets reference to a specific delete map, DO NOT use this function on a + * mutable DeleteBitmap object + * @return nullptr if the given bitmap does not exist + */ + const roaring::Roaring* get(const BitmapKey& bmk) const; + + /** + * Gets subset of delete_bitmap with given range [start, end) + * + * @parma start start + * @parma end end + * @parma subset_delete_map output param + */ + void subset(const BitmapKey& start, const BitmapKey& end, + DeleteBitmap* subset_delete_map) const; + + /** + * Merges the given delete bitmap into *this + * + * @param other + */ + void merge(const DeleteBitmap& other); +}; + static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; inline TabletUid TabletMeta::tablet_uid() const { diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp index 67f9f95fa4..4f2b7f7afd 100644 --- a/be/test/olap/tablet_meta_test.cpp +++ b/be/test/olap/tablet_meta_test.cpp @@ -42,4 +42,83 @@ TEST(TabletMetaTest, SaveAndParse) { EXPECT_EQ(old_tablet_meta, new_tablet_meta); } +TEST(TabletMetaTest, TestDeleteBitmap) { + std::unique_ptr dbmp(new DeleteBitmap()); + auto gen1 = [&dbmp](int64_t max_rst_id, uint32_t max_seg_id, uint32_t max_row) { + for (int64_t i = 0; i < max_rst_id; ++i) { + for (uint32_t j = 0; j < max_seg_id; ++j) { + for (uint32_t k = 0; k < max_row; ++k) { + dbmp->add({RowsetId {2, 0, 1, i}, j, 0}, k); + } + } + } + }; + gen1(10, 20, 1000); + dbmp->add({RowsetId {2, 0, 1, 2}, 2, 0}, 2); // redundant + { + roaring::Roaring d; + dbmp->get({RowsetId {2, 0, 1, 2}, 0, 0}, &d); + EXPECT_EQ(d.cardinality(), 1000); + d -= *dbmp->get({RowsetId {2, 0, 1, 2}, 0, 0}); + EXPECT_EQ(d.cardinality(), 0); + } + + // Add version 1 and 2 + dbmp->add({RowsetId {2, 0, 1, 1}, 1, 1}, 1100); + dbmp->add({RowsetId {2, 0, 1, 1}, 1, 1}, 1101); + dbmp->add({RowsetId {2, 0, 1, 1}, 1, 1}, 1102); + dbmp->add({RowsetId {2, 0, 1, 1}, 1, 1}, 1103); + dbmp->add({RowsetId {2, 0, 1, 1}, 1, 2}, 1104); + + ASSERT_EQ(dbmp->delete_bitmap.size(), 10 * 20 + 2); + + { // Bitmap of certain verisons only get their own row ids + auto bm = dbmp->get({RowsetId {2, 0, 1, 1}, 1, 2}); + ASSERT_EQ(bm->cardinality(), 1); + ASSERT_FALSE(bm->contains(999)); + ASSERT_FALSE(bm->contains(1100)); + ASSERT_TRUE(bm->contains(1104)); + } + + { + // test remove + // Nothing removed + dbmp->remove({RowsetId {2, 0, 1, 1}, 0, 0}, {RowsetId {2, 0, 1, 1}, 0, 0}); + ASSERT_EQ(dbmp->delete_bitmap.size(), 10 * 20 + 2); + dbmp->remove({RowsetId {2, 0, 1, 100}, 0, 0}, {RowsetId {2, 0, 1, 100}, 50000, 0}); + ASSERT_EQ(dbmp->delete_bitmap.size(), 10 * 20 + 2); + + // Remove all seg of rowset {2,0,1,0} + dbmp->remove({RowsetId {2, 0, 1, 0}, 0, 0}, {RowsetId {2, 0, 1, 0}, 5000, 0}); + ASSERT_EQ(dbmp->delete_bitmap.size(), 9 * 20 + 2); + // Remove all rowset {2,0,1,7} to {2,0,1,9} + dbmp->remove({RowsetId {2, 0, 1, 8}, 0, 0}, {RowsetId {2, 0, 1, 9}, 5000, 0}); + ASSERT_EQ(dbmp->delete_bitmap.size(), 7 * 20 + 2); + } + + { + DeleteBitmap db_upper; + dbmp->subset({RowsetId {2, 0, 1, 1}, 1, 0}, {RowsetId {2, 0, 1, 1}, 1000000, 0}, &db_upper); + roaring::Roaring d; + ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 1}, &d), 0); + ASSERT_EQ(d.cardinality(), 4); + ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 2}, &d), 0); + ASSERT_EQ(d.cardinality(), 1); + ASSERT_EQ(db_upper.delete_bitmap.size(), 20); + } + + { + auto old_size = dbmp->delete_bitmap.size(); + // test merge + DeleteBitmap other; + other.add({RowsetId {2, 0, 1, 1}, 1, 1}, 1100); + dbmp->merge(other); + ASSERT_EQ(dbmp->delete_bitmap.size(), old_size); + other.add({RowsetId {2, 0, 1, 1}, 1001, 1}, 1100); + other.add({RowsetId {2, 0, 1, 1}, 1002, 1}, 1100); + dbmp->merge(other); + ASSERT_EQ(dbmp->delete_bitmap.size(), old_size + 2); + } +} + } // namespace doris diff --git a/be/test/olap/test_data/header_without_inc_rs.txt b/be/test/olap/test_data/header_without_inc_rs.txt index bde8d7f4d5..61fc42e419 100644 --- a/be/test/olap/test_data/header_without_inc_rs.txt +++ b/be/test/olap/test_data/header_without_inc_rs.txt @@ -146,5 +146,6 @@ "storage_medium": "HDD", "remote_storage_name": "", "replica_id": 0, - "storage_policy": "" + "storage_policy": "", + "delete_bitmap": {} } diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 4385d5803d..a9ba857c7e 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -278,6 +278,7 @@ message TabletMetaPB { optional string remote_storage_name = 20; optional int64 replica_id = 21 [default = 0]; optional string storage_policy = 22; + optional DeleteBitmapPB delete_bitmap = 23; } message OLAPIndexHeaderMessage { @@ -298,3 +299,11 @@ message OLAPDataHeaderMessage { message OLAPRawDeltaHeaderMessage { required int32 schema_hash = 2; } + +message DeleteBitmapPB { + repeated string rowset_ids = 1; + repeated uint32 segment_ids = 2; + repeated int64 versions = 3; + // Serialized roaring bitmaps indexed with {rowset_id, segment_id, version} + repeated bytes segment_delete_bitmaps = 4; +}