[improvement](log) add txn log (#28875)
This commit is contained in:
@ -94,13 +94,16 @@ Status EnginePublishVersionTask::execute() {
|
||||
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
|
||||
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
|
||||
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
|
||||
LOG_WARNING("EnginePublishVersionTask.finish.random random failed");
|
||||
LOG_WARNING("EnginePublishVersionTask.finish.random random failed")
|
||||
.tag("txn_id", transaction_id);
|
||||
return Status::InternalError("debug engine publish version task random failed");
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait);
|
||||
LOG_WARNING("EnginePublishVersionTask.finish.wait wait")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
@ -204,7 +207,9 @@ Status EnginePublishVersionTask::execute() {
|
||||
partition_id, tablet_info.tablet_id, version.first);
|
||||
}
|
||||
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
|
||||
"check_version_exist failed");
|
||||
"version not continuous for mow, tablet_id={}, "
|
||||
"tablet_max_version={}, txn_version={}",
|
||||
tablet_info.tablet_id, max_version, version.first);
|
||||
int64_t missed_version = max_version + 1;
|
||||
int64_t missed_txn_id =
|
||||
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
|
||||
|
||||
@ -142,13 +142,18 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
|
||||
|
||||
DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
|
||||
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
|
||||
LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
|
||||
LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id);
|
||||
return Status::InternalError("debug prepare txn random failed");
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
|
||||
LOG_WARNING("TxnManager.prepare_txn.wait")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
@ -313,13 +318,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
|
||||
DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
|
||||
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
|
||||
LOG_WARNING("TxnManager.commit_txn.random_failed");
|
||||
LOG_WARNING("TxnManager.commit_txn.random_failed")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id);
|
||||
return Status::InternalError("debug commit txn random failed");
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
|
||||
LOG_WARNING("TxnManager.commit_txn.wait")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
@ -388,7 +398,10 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
rowset_ptr->rowset_meta()->get_rowset_pb(), false);
|
||||
DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
|
||||
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
@ -466,13 +479,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
}
|
||||
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
|
||||
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
|
||||
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
|
||||
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id);
|
||||
return Status::InternalError("debug publish txn before save rs meta random failed");
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait);
|
||||
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
@ -486,13 +504,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
|
||||
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
|
||||
if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
|
||||
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
|
||||
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id);
|
||||
return Status::InternalError("debug publish txn after save rs meta random failed");
|
||||
}
|
||||
});
|
||||
DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
|
||||
if (auto wait = dp->param<int>("duration", 0); wait > 0) {
|
||||
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait);
|
||||
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
|
||||
.tag("txn_id", transaction_id)
|
||||
.tag("tablet_id", tablet_id)
|
||||
.tag("wait ms", wait);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
}
|
||||
});
|
||||
|
||||
@ -589,9 +589,11 @@ public class DatabaseTransactionMgr {
|
||||
tabletVersionFailedReplicas);
|
||||
|
||||
String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica num %s"
|
||||
+ " < load required replica num %s. table %s, partition %s, this tablet detail: %s",
|
||||
+ " < load required replica num %s. table %s, partition: [ id=%s, commit version %s"
|
||||
+ ", visible version %s ], this tablet detail: %s",
|
||||
transactionId, tablet.getId(), successReplicaNum, loadRequiredReplicaNum, tableId,
|
||||
partition.getId(), writeDetail);
|
||||
partition.getId(), partition.getCommittedVersion(), partition.getVisibleVersion(),
|
||||
writeDetail);
|
||||
LOG.info(errMsg);
|
||||
|
||||
throw new TabletQuorumFailedException(transactionId, errMsg);
|
||||
@ -746,7 +748,7 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
|
||||
// update nextVersion because of the failure of persistent transaction resulting in error version
|
||||
updateCatalogAfterCommitted(transactionState, db);
|
||||
updateCatalogAfterCommitted(transactionState, db, false);
|
||||
LOG.info("transaction:[{}] successfully committed", transactionState);
|
||||
}
|
||||
|
||||
@ -1128,9 +1130,9 @@ public class DatabaseTransactionMgr {
|
||||
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
|
||||
logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
|
||||
+ " with version %s, and has failed replicas, load require replica num %s. "
|
||||
+ "table %s, partition %s, tablet detail: %s",
|
||||
transactionState, tablet.getId(), newVersion,
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
|
||||
+ "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s",
|
||||
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId,
|
||||
partitionId, partition.getCommittedVersion(), writeDetail));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@ -1178,7 +1180,8 @@ public class DatabaseTransactionMgr {
|
||||
if (needLog) {
|
||||
transactionState.setLastPublishLogTime(now);
|
||||
for (String log : logs) {
|
||||
LOG.info("{}. publish times {}", log, transactionState.getPublishCount());
|
||||
LOG.info("{}. publish times {}, whole txn publish result {}",
|
||||
log, transactionState.getPublishCount(), publishResult.name());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1793,8 +1796,10 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) {
|
||||
private void updateCatalogAfterCommitted(TransactionState transactionState, Database db, boolean isReplay) {
|
||||
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
|
||||
List<Replica> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletFailedReplicas = Lists.newArrayList();
|
||||
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
@ -1817,13 +1822,32 @@ public class DatabaseTransactionMgr {
|
||||
for (MaterializedIndex index : allIndices) {
|
||||
List<Tablet> tablets = index.getTablets();
|
||||
for (Tablet tablet : tablets) {
|
||||
tabletFailedReplicas.clear();
|
||||
tabletSuccReplicas.clear();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
if (errorReplicaIds.contains(replica.getId())) {
|
||||
// TODO(cmy): do we need to update last failed version here?
|
||||
// because in updateCatalogAfterVisible, it will be updated again.
|
||||
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
|
||||
tabletFailedReplicas.add(replica);
|
||||
} else {
|
||||
tabletSuccReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
if (!isReplay && !tabletFailedReplicas.isEmpty()) {
|
||||
LOG.info("some replicas load data failed for committed txn {} on version {}, table {}, "
|
||||
+ "partition {}, tablet {}, {} replicas load data succ: {}, {} replicas load "
|
||||
+ "data fail: {}",
|
||||
transactionState.getTransactionId(), partitionCommitInfo.getVersion(),
|
||||
tableId, partitionId, tablet.getId(), tabletSuccReplicas.size(),
|
||||
Joiner.on(", ").join(tabletSuccReplicas.stream()
|
||||
.map(replica -> replica.toStringSimple(true))
|
||||
.collect(Collectors.toList())),
|
||||
tabletFailedReplicas.size(),
|
||||
Joiner.on(", ").join(tabletFailedReplicas.stream()
|
||||
.map(replica -> replica.toStringSimple(true))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
}
|
||||
}
|
||||
partition.setNextVersion(partition.getNextVersion() + 1);
|
||||
@ -2036,7 +2060,7 @@ public class DatabaseTransactionMgr {
|
||||
// set transaction status will call txn state change listener
|
||||
transactionState.replaySetTransactionStatus();
|
||||
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
|
||||
updateCatalogAfterCommitted(transactionState, db);
|
||||
updateCatalogAfterCommitted(transactionState, db, true);
|
||||
} else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
updateCatalogAfterVisible(transactionState, db);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user