[Fix](MOW) Fix load data publish timeout when enable unique key MOW (#20720)
This commit is contained in:
@ -438,13 +438,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
|
||||
LOG(WARNING) << "fail to build rowset";
|
||||
return Status::Error<MEM_ALLOC_FAILED>();
|
||||
}
|
||||
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
|
||||
_req.load_id, _cur_rowset, false);
|
||||
if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
|
||||
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
|
||||
<< " for rowset: " << _cur_rowset->rowset_id();
|
||||
return res;
|
||||
}
|
||||
|
||||
if (_tablet->enable_unique_key_merge_on_write()) {
|
||||
auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
@ -458,7 +452,20 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
|
||||
// calculate delete bitmap between segments
|
||||
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments,
|
||||
_delete_bitmap));
|
||||
RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
|
||||
_cur_rowset, _rowset_ids, _delete_bitmap, _tablet->max_version().second,
|
||||
segments, _rowset_writer.get()));
|
||||
}
|
||||
}
|
||||
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
|
||||
_req.load_id, _cur_rowset, false);
|
||||
|
||||
if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
|
||||
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
|
||||
<< " for rowset: " << _cur_rowset->rowset_id();
|
||||
return res;
|
||||
}
|
||||
if (_tablet->enable_unique_key_merge_on_write()) {
|
||||
_storage_engine->txn_manager()->set_txn_related_delete_bitmap(
|
||||
_req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(),
|
||||
_tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
|
||||
|
||||
@ -3134,10 +3134,33 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info,
|
||||
RowsetWriter* rowset_writer) {
|
||||
DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap;
|
||||
const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids;
|
||||
Status Tablet::commit_phase_update_delete_bitmap(
|
||||
const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
|
||||
DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
|
||||
const std::vector<segment_v2::SegmentSharedPtr>& segments, RowsetWriter* rowset_writer) {
|
||||
RowsetIdUnorderedSet cur_rowset_ids;
|
||||
RowsetIdUnorderedSet rowset_ids_to_add;
|
||||
RowsetIdUnorderedSet rowset_ids_to_del;
|
||||
|
||||
std::shared_lock meta_rlock(_meta_lock);
|
||||
cur_rowset_ids = all_rs_id(cur_version);
|
||||
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
|
||||
if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
|
||||
LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
|
||||
<< ", rowset_ids_to_del: " << rowset_ids_to_del.size();
|
||||
}
|
||||
for (const auto& to_del : rowset_ids_to_del) {
|
||||
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap,
|
||||
cur_version, rowset_writer));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
const RowsetIdUnorderedSet& pre_rowset_ids,
|
||||
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer) {
|
||||
RowsetIdUnorderedSet cur_rowset_ids;
|
||||
RowsetIdUnorderedSet rowset_ids_to_add;
|
||||
RowsetIdUnorderedSet rowset_ids_to_del;
|
||||
|
||||
@ -447,7 +447,15 @@ public:
|
||||
|
||||
Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset);
|
||||
|
||||
Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info,
|
||||
Status commit_phase_update_delete_bitmap(
|
||||
const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
|
||||
DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
|
||||
const std::vector<segment_v2::SegmentSharedPtr>& segments,
|
||||
RowsetWriter* rowset_writer = nullptr);
|
||||
|
||||
Status update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
const RowsetIdUnorderedSet& pre_rowset_ids,
|
||||
DeleteBitmapPtr delete_bitmap,
|
||||
RowsetWriter* rowset_writer = nullptr);
|
||||
void calc_compaction_output_rowset_delete_bitmap(
|
||||
const std::vector<RowsetSharedPtr>& input_rowsets,
|
||||
|
||||
@ -368,8 +368,9 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
std::unique_ptr<RowsetWriter> rowset_writer;
|
||||
_create_transient_rowset_writer(tablet, rowset, &rowset_writer);
|
||||
|
||||
RETURN_IF_ERROR(
|
||||
tablet->update_delete_bitmap(rowset, &tablet_txn_info, rowset_writer.get()));
|
||||
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids,
|
||||
tablet_txn_info.delete_bitmap,
|
||||
rowset_writer.get()));
|
||||
if (rowset->tablet_schema()->is_partial_update()) {
|
||||
// build rowset writer and merge transient rowset
|
||||
RETURN_IF_ERROR(rowset_writer->flush());
|
||||
|
||||
Reference in New Issue
Block a user