From 9422c973af12fcfe60ebf60dfd8fc1deb9591486 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 9 May 2025 11:16:18 +0800 Subject: [PATCH] branch-2.1: [fix](binlog) Acquire migration lock before ingesting binlog #50663 (#50709) cherry pick from #50663 --- be/src/service/backend_service.cpp | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 8d0326f0c3..07e5ef4104 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1035,13 +1035,18 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, PUniqueId p_load_id; p_load_id.set_hi(load_id.hi); p_load_id.set_lo(load_id.lo); - auto status = StorageEngine::instance()->txn_manager()->prepare_txn( - partition_id, *local_tablet, txn_id, p_load_id, is_ingrest); - if (!status.ok()) { - LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id - << ", status=" << status.to_string(); - status.to_thrift(&tstatus); - return; + + { + // See RowsetBuilder::prepare_txn for details + std::shared_lock base_migration_lock(local_tablet->get_migration_lock()); + auto status = StorageEngine::instance()->txn_manager()->prepare_txn( + partition_id, *local_tablet, txn_id, p_load_id, is_ingrest); + if (!status.ok()) { + LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } } bool is_async = (_ingest_binlog_workers != nullptr); @@ -1062,7 +1067,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, }; if (is_async) { - status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); + auto status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func)); if (!status.ok()) { status.to_thrift(&tstatus); return;