From 7e82e7651a368c47fafa6607b585d77e35c5df19 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 16 Nov 2023 11:50:06 +0800 Subject: [PATCH] [Improve](txn) Add some fuzzy test stub in txn (#26712) --- .../olap/task/engine_publish_version_task.cpp | 13 ++++ be/src/olap/txn_manager.cpp | 61 +++++++++++++++++-- .../org/apache/doris/alter/AlterJobV2.java | 23 +++++++ 3 files changed, 92 insertions(+), 5 deletions(-) diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 1d59efcfee..24096d1b69 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -39,6 +39,7 @@ #include "olap/txn_manager.h" #include "olap/utils.h" #include "util/bvar_helper.h" +#include "util/debug_points.h" #include "util/threadpool.h" namespace doris { @@ -91,6 +92,18 @@ Status EnginePublishVersionTask::finish() { int64_t transaction_id = _publish_version_req.transaction_id; OlapStopWatch watch; VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id; + DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("EnginePublishVersionTask.finish.random random failed"); + return Status::InternalError("debug engine publish version task random failed"); + } + }); + DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); std::unique_ptr token = StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2d5ce73b86..1ed6f74eb8 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -139,6 +139,19 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac std::lock_guard txn_wrlock(_get_txn_map_lock(transaction_id)); txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("TxnManager.prepare_txn.random_failed random failed"); + return Status::InternalError("debug prepare txn random failed"); + } + }); + DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); + /// Step 1: check if the transaction is already exist do { auto iter = txn_tablet_map.find(key); @@ -296,11 +309,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, key.first, key.second, tablet_info.to_string()); } - DBUG_EXECUTE_IF( - "TxnManager.commit_txn_random_failed", - if (rand() % 100 < (100 * dp->param("percent", 0.5))) { - return Status::InternalError("debug commit txn random failed"); - }); + DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("TxnManager.commit_txn.random_failed"); + return Status::InternalError("debug commit txn random failed"); + } + }); + DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); std::lock_guard txn_lock(_get_txn_lock(transaction_id)); // this while loop just run only once, just for if break @@ -356,6 +376,12 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, if (!is_recovery) { Status save_status = RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), rowset_ptr->rowset_meta()->get_rowset_pb()); + DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); if (!save_status.ok()) { return Status::Error( "save committed rowset failed. when commit txn rowset_id: {}, tablet id: {}, " @@ -430,6 +456,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, "tablet={}", partition_id, transaction_id, tablet_info.to_string()); } + DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta"); + return Status::InternalError("debug publish txn before save rs meta random failed"); + } + }); + DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); /// Step 2: make rowset visible // save meta need access disk, it maybe very slow, so that it is not in global txn lock @@ -437,6 +475,19 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // TODO(ygl): rowset is already set version here, memory is changed, if save failed // it maybe a fatal error rowset->make_visible(version); + + DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta"); + return Status::InternalError("debug publish txn after save rs meta random failed"); + } + }); + DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { + if (auto wait = dp->param("duration", 0); wait > 0) { + LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", wait); + std::this_thread::sleep_for(std::chrono::milliseconds(wait)); + } + }); // update delete_bitmap if (tablet_txn_info->unique_key_merge_on_write) { std::unique_ptr rowset_writer; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index c1984d31d4..a5b3c867ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -26,6 +26,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; @@ -164,6 +165,19 @@ public abstract class AlterJobV2 implements Writable { this.finishedTimeMs = finishedTimeMs; } + // /api/debug_point/add/{name}?value=100 + private void stateWait(final String name) { + long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0); + if (waitTimeMs > 0) { + try { + LOG.info("debug point {} wait {} ms", name, waitTimeMs); + Thread.sleep(waitTimeMs); + } catch (InterruptedException e) { + LOG.warn(name, e); + } + } + } + /** * The keyword 'synchronized' only protects 2 methods: * run() and cancel() @@ -180,15 +194,24 @@ public abstract class AlterJobV2 implements Writable { return; } + // /api/debug_point/add/FE.STOP_ALTER_JOB_RUN + if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) { + LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change schedule stopped"); + return; + } + try { switch (jobState) { case PENDING: + stateWait("FE.ALTER_JOB_V2_PENDING"); runPendingJob(); break; case WAITING_TXN: + stateWait("FE.ALTER_JOB_V2_WAITING_TXN"); runWaitingTxnJob(); break; case RUNNING: + stateWait("FE.ALTER_JOB_V2_RUNNING"); runRunningJob(); break; default: