branch-2.1: [fix](binlog) Acquire migration lock before ingesting binlog #50663 (#50709)

cherry pick from #50663
This commit is contained in:
walter
2025-05-09 11:16:18 +08:00
committed by GitHub
parent 0347e8a3c6
commit 9422c973af

View File

@ -1035,13 +1035,18 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
PUniqueId p_load_id;
p_load_id.set_hi(load_id.hi);
p_load_id.set_lo(load_id.lo);
auto status = StorageEngine::instance()->txn_manager()->prepare_txn(
partition_id, *local_tablet, txn_id, p_load_id, is_ingrest);
if (!status.ok()) {
LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id
<< ", status=" << status.to_string();
status.to_thrift(&tstatus);
return;
{
// See RowsetBuilder::prepare_txn for details
std::shared_lock base_migration_lock(local_tablet->get_migration_lock());
auto status = StorageEngine::instance()->txn_manager()->prepare_txn(
partition_id, *local_tablet, txn_id, p_load_id, is_ingrest);
if (!status.ok()) {
LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id
<< ", status=" << status.to_string();
status.to_thrift(&tstatus);
return;
}
}
bool is_async = (_ingest_binlog_workers != nullptr);
@ -1062,7 +1067,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
};
if (is_async) {
status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
auto status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
if (!status.ok()) {
status.to_thrift(&tstatus);
return;