From 9c742d46a231fb5e6af490a77d29a0d30e951a7e Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 22 Jan 2024 20:33:00 +0800 Subject: [PATCH] [fix](group commit) abort txn should use label if replay wal failed (#30219) --- be/src/olap/wal/wal_manager.cpp | 10 +++---- be/src/olap/wal/wal_table.cpp | 29 +++++++++---------- be/src/olap/wal/wal_table.h | 2 +- be/src/runtime/group_commit_mgr.cpp | 12 ++++---- .../doris/service/FrontendServiceImpl.java | 24 ++++++++++----- gensrc/thrift/FrontendService.thrift | 2 ++ 6 files changed, 44 insertions(+), 35 deletions(-) diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 621a1aa080..45a9b58ff2 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -198,9 +198,9 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) { return 0; } } else { - //table_id is -1 meaning get all table wal size - for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) { - count += it->second.size(); + // table_id is -1 meaning get all table wal size + for (auto& [_, table_wals] : _wal_queues) { + count += table_wals.size(); } } return count; @@ -372,8 +372,8 @@ size_t WalManager::get_wal_table_size(int64_t table_id) { void WalManager::_stop_relay_wal() { std::lock_guard wrlock(_table_lock); - for (auto it = _table_map.begin(); it != _table_map.end(); it++) { - it->second->stop(); + for (auto& [_, wal_table] : _table_map) { + wal_table->stop(); } } diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index a6d7a4054c..8adc9cf5f5 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -58,28 +58,27 @@ void WalTable::_pick_relay_wals() { std::lock_guard lock(_replay_wal_lock); std::vector need_replay_wals; std::vector need_erase_wals; - for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) { - auto wal_info = it->second; + for (const auto& [wal_path, wal_info] : _replay_wal_map) { if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id - << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num(); - auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id, + << ", wal=" << wal_path << ", retry_num=" << wal_info->get_retry_num(); + auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path, _table_id, wal_info->get_wal_id()); if (!st.ok()) { - LOG(WARNING) << "rename " << it->first << " fail" + LOG(WARNING) << "rename " << wal_path << " fail" << ",st:" << st.to_string(); } if (config::group_commit_wait_replay_wal_finish) { - auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id()); + auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()); if (!notify_st.ok()) { - LOG(WARNING) << "notify wal " << it->second->get_wal_id() << " fail"; + LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail"; } } - need_erase_wals.push_back(it->first); + need_erase_wals.push_back(wal_path); continue; } if (_need_replay(wal_info)) { - need_replay_wals.push_back(it->first); + need_replay_wals.push_back(wal_path); } } for (const auto& wal : need_erase_wals) { @@ -168,13 +167,13 @@ bool WalTable::_need_replay(std::shared_ptr wal_info) { #endif } -Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { +Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { TLoadTxnRollbackRequest request; request.__set_auth_code(0); // this is a fake, fe not check it now request.__set_db_id(db_id); // TODO should we use label, because the replay wal use the same label and different wal_id - request.__set_txnId(wal_id); - std::string reason = "relay wal " + std::to_string(wal_id); + request.__set_label(label); + std::string reason = "relay wal with label " + label; request.__set_reason(reason); TLoadTxnRollbackResult result; TNetworkAddress master_addr = _exec_env->master_info()->network_address; @@ -185,7 +184,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { }, 10000L); auto result_status = Status::create(result.status); - LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" << result_status; + LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status; return result_status; } @@ -196,9 +195,9 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label)); #ifndef BE_TEST if (!config::group_commit_wait_replay_wal_finish) { - auto st = _try_abort_txn(_db_id, wal_id); + auto st = _try_abort_txn(_db_id, label); if (!st.ok()) { - LOG(WARNING) << "abort txn " << wal_id << " fail"; + LOG(WARNING) << "failed to abort txn with label " << label; } } #endif diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h index 07287d8f7e..9b1ead87a2 100644 --- a/be/src/olap/wal/wal_table.h +++ b/be/src/olap/wal/wal_table.h @@ -47,7 +47,7 @@ private: Status _replay_wal_internal(const std::string& wal); Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label); - Status _try_abort_txn(int64_t db_id, int64_t wal_id); + Status _try_abort_txn(int64_t db_id, std::string& label); Status _get_column_info(int64_t db_id, int64_t tb_id, std::map& column_info_map); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 3931306cd6..d057dd92b9 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -191,14 +191,14 @@ Status GroupCommitTable::get_first_block_load_queue( std::unique_lock l(_lock); for (int i = 0; i < 3; i++) { bool is_schema_version_match = true; - for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { - if (!it->second->need_commit()) { - if (base_schema_version == it->second->schema_version) { - if (it->second->add_load_id(load_id).ok()) { - load_block_queue = it->second; + for (const auto& [_, inner_block_queue] : _load_block_queues) { + if (!inner_block_queue->need_commit()) { + if (base_schema_version == inner_block_queue->schema_version) { + if (inner_block_queue->add_load_id(load_id).ok()) { + load_block_queue = inner_block_queue; return Status::OK(); } - } else if (base_schema_version < it->second->schema_version) { + } else if (base_schema_version < inner_block_queue->schema_version) { is_schema_version_match = false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1a1ceb799c..85753a7330 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1663,15 +1663,23 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new MetaNotFoundException("db " + request.getDb() + " does not exist"); } long dbId = db.getId(); - TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(dbId, request.getTxnId()); - if (transactionState == null) { - throw new UserException("transaction [" + request.getTxnId() + "] not found"); + if (request.getTxnId() != 0) { // txnId is required in thrift + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, request.getTxnId()); + if (transactionState == null) { + throw new UserException("transaction [" + request.getTxnId() + "] not found"); + } + List tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList()); + Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(), + request.isSetReason() ? request.getReason() : "system cancel", + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList); + } else if (request.isSetLabel()) { + Env.getCurrentGlobalTransactionMgr() + .abortTransaction(db.getId(), request.getLabel(), + request.isSetReason() ? request.getReason() : "system cancel"); + } else { + throw new UserException("must set txn_id or label"); } - List
tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList()); - Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(), - request.isSetReason() ? request.getReason() : "system cancel", - TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList); } @Override diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7d1c8c62ae..98c876afbf 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -816,6 +816,8 @@ struct TLoadTxnRollbackRequest { 11: optional string token 12: optional i64 db_id 13: optional list tbls + 14: optional string auth_code_uuid + 15: optional string label } struct TLoadTxnRollbackResult {