[fix](group commit) abort txn should use label if replay wal failed (#30219)

This commit is contained in:
meiyi
2024-01-22 20:33:00 +08:00
committed by yiguolei
parent 9e0c518aaf
commit 9c742d46a2
6 changed files with 44 additions and 35 deletions

View File

@ -198,9 +198,9 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
return 0;
}
} else {
//table_id is -1 meaning get all table wal size
for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
count += it->second.size();
// table_id is -1 meaning get all table wal size
for (auto& [_, table_wals] : _wal_queues) {
count += table_wals.size();
}
}
return count;
@ -372,8 +372,8 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
void WalManager::_stop_relay_wal() {
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
it->second->stop();
for (auto& [_, wal_table] : _table_map) {
wal_table->stop();
}
}

View File

@ -58,28 +58,27 @@ void WalTable::_pick_relay_wals() {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
std::vector<std::string> need_replay_wals;
std::vector<std::string> need_erase_wals;
for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) {
auto wal_info = it->second;
for (const auto& [wal_path, wal_info] : _replay_wal_map) {
if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
<< ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num();
auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id,
<< ", wal=" << wal_path << ", retry_num=" << wal_info->get_retry_num();
auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path, _table_id,
wal_info->get_wal_id());
if (!st.ok()) {
LOG(WARNING) << "rename " << it->first << " fail"
LOG(WARNING) << "rename " << wal_path << " fail"
<< ",st:" << st.to_string();
}
if (config::group_commit_wait_replay_wal_finish) {
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id());
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
if (!notify_st.ok()) {
LOG(WARNING) << "notify wal " << it->second->get_wal_id() << " fail";
LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
}
}
need_erase_wals.push_back(it->first);
need_erase_wals.push_back(wal_path);
continue;
}
if (_need_replay(wal_info)) {
need_replay_wals.push_back(it->first);
need_replay_wals.push_back(wal_path);
}
}
for (const auto& wal : need_erase_wals) {
@ -168,13 +167,13 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
#endif
}
Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
// TODO should we use label, because the replay wal use the same label and different wal_id
request.__set_txnId(wal_id);
std::string reason = "relay wal " + std::to_string(wal_id);
request.__set_label(label);
std::string reason = "relay wal with label " + label;
request.__set_reason(reason);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
@ -185,7 +184,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
},
10000L);
auto result_status = Status::create(result.status);
LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" << result_status;
LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status;
return result_status;
}
@ -196,9 +195,9 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
#ifndef BE_TEST
if (!config::group_commit_wait_replay_wal_finish) {
auto st = _try_abort_txn(_db_id, wal_id);
auto st = _try_abort_txn(_db_id, label);
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
LOG(WARNING) << "failed to abort txn with label " << label;
}
}
#endif

View File

@ -47,7 +47,7 @@ private:
Status _replay_wal_internal(const std::string& wal);
Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label);
Status _try_abort_txn(int64_t db_id, int64_t wal_id);
Status _try_abort_txn(int64_t db_id, std::string& label);
Status _get_column_info(int64_t db_id, int64_t tb_id,
std::map<int64_t, std::string>& column_info_map);

View File

@ -191,14 +191,14 @@ Status GroupCommitTable::get_first_block_load_queue(
std::unique_lock l(_lock);
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
if (!it->second->need_commit()) {
if (base_schema_version == it->second->schema_version) {
if (it->second->add_load_id(load_id).ok()) {
load_block_queue = it->second;
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id).ok()) {
load_block_queue = inner_block_queue;
return Status::OK();
}
} else if (base_schema_version < it->second->schema_version) {
} else if (base_schema_version < inner_block_queue->schema_version) {
is_schema_version_match = false;
}
}

View File

@ -1663,15 +1663,23 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
}
long dbId = db.getId();
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
if (request.getTxnId() != 0) { // txnId is required in thrift
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(),
request.isSetReason() ? request.getReason() : "system cancel",
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
} else if (request.isSetLabel()) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(db.getId(), request.getLabel(),
request.isSetReason() ? request.getReason() : "system cancel");
} else {
throw new UserException("must set txn_id or label");
}
List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(),
request.isSetReason() ? request.getReason() : "system cancel",
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
}
@Override

View File

@ -816,6 +816,8 @@ struct TLoadTxnRollbackRequest {
11: optional string token
12: optional i64 db_id
13: optional list<string> tbls
14: optional string auth_code_uuid
15: optional string label
}
struct TLoadTxnRollbackResult {