diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 919af5cdf1..07e24e2f3f 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -438,13 +438,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, LOG(WARNING) << "fail to build rowset"; return Status::Error(); } - Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, - _req.load_id, _cur_rowset, false); - if (!res && !res.is()) { - LOG(WARNING) << "Failed to commit txn: " << _req.txn_id - << " for rowset: " << _cur_rowset->rowset_id(); - return res; - } + if (_tablet->enable_unique_key_merge_on_write()) { auto beta_rowset = reinterpret_cast(_cur_rowset.get()); std::vector segments; @@ -458,7 +452,20 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, // calculate delete bitmap between segments RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, _delete_bitmap)); + RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap( + _cur_rowset, _rowset_ids, _delete_bitmap, _tablet->max_version().second, + segments, _rowset_writer.get())); } + } + Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, + _req.load_id, _cur_rowset, false); + + if (!res && !res.is()) { + LOG(WARNING) << "Failed to commit txn: " << _req.txn_id + << " for rowset: " << _cur_rowset->rowset_id(); + return res; + } + if (_tablet->enable_unique_key_merge_on_write()) { _storage_engine->txn_manager()->set_txn_related_delete_bitmap( _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 011ea8e9b6..36f8e0076e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3134,10 +3134,33 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) return Status::OK(); } -Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info, - RowsetWriter* rowset_writer) { - DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap; - const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids; +Status Tablet::commit_phase_update_delete_bitmap( + const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, + const std::vector& segments, RowsetWriter* rowset_writer) { + RowsetIdUnorderedSet cur_rowset_ids; + RowsetIdUnorderedSet rowset_ids_to_add; + RowsetIdUnorderedSet rowset_ids_to_del; + + std::shared_lock meta_rlock(_meta_lock); + cur_rowset_ids = all_rs_id(cur_version); + _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); + if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { + LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size() + << ", rowset_ids_to_del: " << rowset_ids_to_del.size(); + } + for (const auto& to_del : rowset_ids_to_del) { + delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); + } + + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap, + cur_version, rowset_writer)); + return Status::OK(); +} + +Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, + const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer) { RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index cd4e21fb79..55a3b676b9 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -447,7 +447,15 @@ public: Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset); - Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info, + Status commit_phase_update_delete_bitmap( + const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, + const std::vector& segments, + RowsetWriter* rowset_writer = nullptr); + + Status update_delete_bitmap(const RowsetSharedPtr& rowset, + const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer = nullptr); void calc_compaction_output_rowset_delete_bitmap( const std::vector& input_rowsets, diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 1d7d19fb1c..f7e56fd785 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -368,8 +368,9 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, std::unique_ptr rowset_writer; _create_transient_rowset_writer(tablet, rowset, &rowset_writer); - RETURN_IF_ERROR( - tablet->update_delete_bitmap(rowset, &tablet_txn_info, rowset_writer.get())); + RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids, + tablet_txn_info.delete_bitmap, + rowset_writer.get())); if (rowset->tablet_schema()->is_partial_update()) { // build rowset writer and merge transient rowset RETURN_IF_ERROR(rowset_writer->flush());