[fix](merge-on-write) fix duplicate key in schema change (#24782)
This commit is contained in:
@ -293,12 +293,6 @@ public:
|
||||
|
||||
bool check_rowset_segment();
|
||||
|
||||
bool start_publish() {
|
||||
bool expect = false;
|
||||
return _is_publish_running.compare_exchange_strong(expect, true);
|
||||
}
|
||||
void finish_publish() { _is_publish_running.store(false); }
|
||||
|
||||
[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
|
||||
|
||||
protected:
|
||||
@ -334,7 +328,6 @@ protected:
|
||||
// rowset state machine
|
||||
RowsetStateMachine _rowset_state_machine;
|
||||
std::atomic<uint64_t> _delayed_expired_timestamp = 0;
|
||||
std::atomic<bool> _is_publish_running {false};
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -3307,7 +3307,6 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
_load_rowset_segments(rowset, &segments);
|
||||
|
||||
std::lock_guard<std::mutex> rwlock(_rowset_update_lock);
|
||||
{
|
||||
std::shared_lock meta_rlock(_meta_lock);
|
||||
// tablet is under alter process. The delete bitmap will be calculated after conversion.
|
||||
|
||||
@ -24,6 +24,7 @@
|
||||
#include <chrono> // IWYU pragma: keep
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <ostream>
|
||||
#include <set>
|
||||
#include <shared_mutex>
|
||||
@ -270,14 +271,12 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
|
||||
}
|
||||
|
||||
void TabletPublishTxnTask::handle() {
|
||||
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
|
||||
if (!_rowset->start_publish()) {
|
||||
LOG(WARNING) << "publish is running. rowset_id=" << _rowset->rowset_id()
|
||||
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
|
||||
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
|
||||
return;
|
||||
std::unique_lock<std::mutex> rowset_update_lock(_tablet->get_rowset_update_lock(),
|
||||
std::defer_lock);
|
||||
if (_tablet->enable_unique_key_merge_on_write()) {
|
||||
rowset_update_lock.lock();
|
||||
}
|
||||
Defer defer {[&] { _rowset->finish_publish(); }};
|
||||
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
|
||||
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
|
||||
_partition_id, _tablet, _transaction_id, _version, &_stats);
|
||||
if (!publish_status.ok()) {
|
||||
@ -311,6 +310,7 @@ void TabletPublishTxnTask::handle() {
|
||||
}
|
||||
|
||||
void AsyncTabletPublishTask::handle() {
|
||||
std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
|
||||
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
|
||||
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
|
||||
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
|
||||
@ -320,12 +320,6 @@ void AsyncTabletPublishTask::handle() {
|
||||
return;
|
||||
}
|
||||
RowsetSharedPtr rowset = iter->second;
|
||||
if (!rowset->start_publish()) {
|
||||
LOG(WARNING) << "publish is running. rowset_id=" << rowset->rowset_id()
|
||||
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
|
||||
return;
|
||||
}
|
||||
Defer defer {[&] { rowset->finish_publish(); }};
|
||||
Version version(_version, _version);
|
||||
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
|
||||
_partition_id, _tablet, _transaction_id, version, &_stats);
|
||||
|
||||
Reference in New Issue
Block a user